个人笔记

专注互联网

CGI网关服务器漫谈

作为一名C/C++转后端的程序员,习惯于socket的裸(在tcp/ip下面操作系统还隐藏了很多细节,这里姑且不论),会纠结一个问题,为什么python还需要wsgi服务器夹在中间呢?

在一个web请求到达实际的业务逻辑,需要(但不限于)经过

  1. 处理tcp连接,读写二进制数据
  2. 解析http协议,并判断合法性
  3. 检查参数合法性
  4. 检查权限/访问控制
  5. 安全防护检查
  6. 特定语言的参数序列化(绑定)

而业务逻辑处理之后,又需要(但不限于)经过

  1. 反序列化数据(比如对象转json/xml)
  2. 格式化输出(比如html模板引擎)

除了各种中间服务器,还有各种web框架,例如Python flask

对于socket处理,Http(s)协议解析,nginx/apache等成熟服务器是强项,由于是通用的服务器,并不会和特定的语言绑定,所以要和用特定程序语言实现的web业务逻辑配合工作,就需要一定的协议来交互,这就产生了CGI。

并不是所有的后端语言都需要独立服务器来配合处理网络和http协议,Golang就是一个例外,Golang内置的Http库非常完善(也有很高效的第三方库,比如fasthttp),所以Go web框架大都把这块直接包进去了。

这种做法在部署上有更好的灵活性和便捷性,特别是配合docker,对比spring boot打包jetty直接运行

对于其他语言,例如python,通常会采用Http <--> WSGI <--> WEB框架 <--> 业务代码 的方式

并非python无法实现类似与Golang的大一统集成方案,理论上什么语言都是可行的,不过基于性能上的考虑,或者避免重复造轮子等。

这种方案利用了各自的优势,协同完成整个事情,但他们并非物理分割的实体,比如uWSGI或者gunicorn就实现了http服务器和wsgi协议处理,

在很多文章中,例如【Flask+uwsgi+Nginx】,会再附加一个nginx在最前面,这更多的是为了扩展其他一些功能,比如

  1. 反向代理
  2. 负责均衡
  3. 代理静态资源
  4. https

nginx负载均衡在应用层处理,在一些高性能场景,可能需要特殊定制的负载均衡服务器,直接在内核的ip层/数据链路层作处理

scgi的一个简单实现如wsgiref.simple_server,基于它很容易写出一个简单的Python web程序,或者扩展成一个web框架

from wsgiref.simple_server import make_server

def my_app(environ, start_response):
"""a simple wsgi application"""
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [u"你好! 歡迎來到Victor的第一個WSGI程式".encode('utf8')]

httpd = make_server('', 8000, my_app)
print("Serving on port 8000...")
httpd.serve_forever()

借助uWSGI也很容易实现

def application(environ, start_response):
status = '200 OK'
output = 'Hello World!'

response_headers = [('Content-type', 'text/plain'),
('Content-Length', str(len(output)))]
start_response(status, response_headers)

return [output]

pip install uwsgi
king@king:~/code/bug$ uwsgi --version
2.0.15
king@king:~/code/bug$ uwsgi --http :9090 --wsgi-file server.py

wcgi利用了语言的特性,直接通过调用和回调的方式实现高效请求转发和回传,wcgi和业务代码就在同个进程中,而最早的cgi并非这样

最早的CGI是很灵活的,收到请求后CGI网关会启动子进程,并通过IPC(通常的做法是将stdin/stdout对接到输入输出)发送请求和接收结果,处理完成,子进程退出。

nging配置cgi比较繁琐,可以通过nc来模拟一些这个流程

#include <stdio.h>
int main()
{
char buf_[1024];
do{
if(gets(buf_))
{
printf("recv : %s\n",buf_);
fflush(stdout);
}
}while(1);
return 0;
}

server

king@king:/tmp$ gcc a.c
king@king:/tmp$ rm -f /tmp/f; mkfifo /tmp/f
king@king:/tmp$ cat /tmp/f | ./a.out | nc -l -p 12345 127.0.0.1 > /tmp/f

client

king@king:/tmp$ nc 127.0.0.1 12345
1
recv : 1
2
recv : 2
3
recv : 3
4
recv : 4
5
recv : 5
6
recv : 6

CGI的设计可以支持各种不同语言的程序,问题是性能太差,所以又搞出了什么fastcgi。

FastCGI,顾名思义为更快的 CGI,它允许在一个进程内处理多个请求,而不是一个请求处理完毕就直接结束进程,性能上有了很大的提高。这其中又以Php的PHP-FPM比较典型,和Python的网关服务器将Http协议处理部分自行处理不同,Php网关服务器通常配合第三方Http服务器使用,例如nginx或者lighttpd

PHP-FPM作为一个独立的网关服务器存在,和nginx类似,会有一个master进程接收请求(nginx的master进程做的事情少得多)和若干worker进程,每个worker进程会跑一个PHP解释器,是 PHP 代码真正执行的地方。master进程则监听端口,并接收来自Http服务器的请求,通常情况下使用unixsocket。

nginx作为一个通用的服务器,为了配合PHP-FPM,需要使用插件fastcgi_param,根据配置该插件会作一些转换并请求代理到FPM的地址。

处理好了Http请求分析,调度传递之后,还有很多事情需要处理,如前面提到的【检查参数合法性】、【检查权限/访问控制】等,这就是web框架的事情了,web框架是为了提高开发效率,让开发者更聚焦于业务逻辑,直接裸写也是OK的。

证书与Nginx部署

证书类别

按照审核方式,一般的CA机构提供三种类型的SSL证书:域名型SSL(DV SSL),企业型SSL(OV SSL)以及增强型SSL(EV SSL)证书。

三种证书底层的技术实现是一致的,区别在于证书发布方对申请方的审核程度,在证书的颁发对象中有很多字段,比如

  1. CN Common Name
  2. OU Organization Unit
  3. O Organization Name
  4. L Locality
  5. S State/Provice
  6. C Country

一个典型的EV证书颁发对象(工商银行),可以看到各种字段非常详尽

CN = corporbank-simp.icbc.com.cn
OU = Software Development Center
O = Industrial and Commercial Bank of China Limited
STREET = NO.55 Fuxingmen Nei Street Xicheng District, Beijing
L = Beijing
S = Beijing
PostalCode = 100140
C = CN
SERIALNUMBER = 100000000003965
2.5.4.15 = Private Organization
1.3.6.1.4.1.311.60.2.1.2 = BEIJING
1.3.6.1.4.1.311.60.2.1.3 = CN

百度的OV证书

CN = baidu.com
OU = service operation department.
O = BeiJing Baidu Netcom Science Technology Co., Ltd
L = beijing
S = beijing
C = CN

在腾讯云上申请的DV证书,就剩下可怜的一行,这种免费证书不支持多域名和泛域名(比如*.qiujinwu.com),只能写死某个域名。

CN = ycy.qiujinwu.com

关于证书的支持范围

单域名 多域名 泛域名 多泛域名
DV 支持 支持 不支持 不支持
OV 支持 支持 支持 支持
EV 支持 支持 不支持 不支持
举例 www.barretlee.com www.barretlee.com
www.xiaohuzige.com
www.barret.cc
*.barretlee.com .barretlee.com
.xiaohuzige.com
*.barret.cc

配置Nginx

配置服务器证书即可,实现Https的访问,对于上面的免费DV证书,就可以实现全网的可信任Https

server {
listen 443 ssl;
root /var/www/html;
ssl on;
ssl_certificate /tmp/a/server.crt;
ssl_certificate_key /tmp/a/server.key;

ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;

ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
}

自建CA

证书支持一个树状的层级认证链关系,例如Google首页的证书

Builtin Object Token:GeoTrust Global CA
Google Internet Authority G2
*.google.com.hk

按照通行的做法,会建多级ca,一个root ca和若干个和若干层intermediate ca,然后再一堆的服务器/客户端证书,此内容重点参考http://www.barretlee.com/blog/2016/04/24/detail-about-ca-and-certs/

工具shell

由于创建公私钥,证书、签名的过程需要输入密码等各种参数,为了方便测试,下面的脚本进行了封装,支持批量化运行

#!/bin/bash
# 常用后缀
key=".key" # 私(公)钥
crt=".crt" # 证书
csr=".csr" # 证书签名请求
p12=".p12" # 打包了私钥的证书

# 创建私(公)钥
create_key(){
# 加密方式,可以但不限这些,
# aes256/des3/aes128

# 2048 可以作为参数,亦可调整大小
name="${1}${key}"
if [ "${#}" -gt 1 ];then
openssl genrsa -aes256 -passout pass:${2} -out "${name}" 2048
else
# 不填写参数则无密码
openssl genrsa -out "${name}" 2048
fi
}

# 移除私(公)钥的密码
remove_key_pwd(){
# notify passin & passout
name="${1}${key}"
if [ "${#}" -gt 1 ];then
openssl rsa -in "${name}" -passin pass:"${2}" -out "${name}"
else
openssl rsa -in "${name}" -out "${name}"
fi
}

# 输出证书详情
check_crt(){
openssl x509 -noout -text -in "${1}${crt}"
}

# 基于测试,SUJECT作为一个全局的参数存在,除了Common Name
C="CN"
ST="GD"
L="SZ"
O="0MyCompany"
OU="Department"

# 自签名
self_sign(){
# SUJECT 的Common Name
common_name=$1
output=$2
# 是否输入了密码
password_param=""
if [ "${#}" -gt 2 ];then
password_param="-passin pass:${3}"
fi

SUBJECT="/C=${C}/ST=${ST}/L=${L}/O=${O}/OU=${OU}/CN=${common_name}"
# 留意-config -extensions
openssl req -config `pwd`/ca_openssl.cnf \
${password_param} -subj ${SUBJECT} \
-extensions v3_ca \
-key "${output}${key}" -new -x509 \
-days 7200 -sha256 -out "${output}${crt}"
}

# 创建证书签名请求
create_csr(){
# SUJECT 的Common Name
common_name=$1
output=$2
# 和函数self_sign一样
SUBJECT="/C=${C}/ST=${ST}/L=${L}/O=${O}/OU=${OU}/CN=${common_name}"
# 是否输入了密码
password_param=""
if [ "${#}" -gt 2 ];then
password_param="-passin pass:${3}"
fi
# 留意-config -extensions
openssl req -config `pwd`/imm_openssl.cnf ${password_param} -new -sha256 -subj ${SUBJECT} \
-key "${output}${key}" -out "${output}${csr}"
}

# ca签名,包括root ca给intermediate签名,以及intermediate ca和业务证书签名
ca_sign(){
input=$1
cafile=$2
# 是否输入了密码
password_param=""
if [ "${#}" -gt 2 ];then
password_param="-passin pass:${3}"
fi

extensionsparam=""
cnfparam="-config `pwd`/imm_openssl.cnf"
if [ "${cafile}" == "ca" ];then
# root ca签名的情形,使用ca_openssl.cnf,以及指定extensions
cnfparam="-config `pwd`/ca_openssl.cnf"
extensionsparam="-extensions v3_intermediate_ca"
fi

# -batch without (Y/N)prompt
openssl ca ${cnfparam} ${password_param} ${extensionsparam} \
-batch -days 3600 -notext -md sha256 \
-in "${input}${csr}" -out "${input}${crt}" #\
# 由于使用了conf指定ca,这些参数是多余的
#-cert "${cafile}${crt}" -keyfile "${cafile}${key}"
}

# 验证证书
verify_crt(){
input=${1}
cafile=${2}
openssl verify -CAfile "${cafile}${crt}" "${input}${crt}"
}

# 将证书和私钥打包成p12格式
export_p12() {
input=${1}
opassword=${2}
# 是否输入了密码
password_param=""
if [ "${#}" -gt 2 ];then
password_param="-passin pass:${3}"
fi
openssl pkcs12 ${password_param} -passout pass:${opassword} \
-export -clcerts -in "${input}${crt}" \
-inkey "${input}${key}" -out "${input}${p12}"
}

由于不同的ca有不同的会话环境,添加工具函数

create_conf(){
rdir="${1}"
# 生成环境应该考虑相对路径,以及安全性的问题
if [ "${rdir}" == "/" ];then
return
else
rm -rf "${rdir}"
fi

mkdir -p "${rdir}"
cd "${rdir}"
mkdir -p certs crl csr newcerts private
touch index.txt
echo 1000 > serial
cd -
}

创建证书

# 全局的密码,方便测试
pass=123456
# ca chain,方便@1批量导入@2nginx需要
cachain="cachain"

# 创建证书(含intermediate证书和业务证书)并签名
create_and_sign(){
# 输入文件,不含后缀
output=${1}
# common name
cname=${2}
# ca文件名称,不含后缀
caname=${3}
# 创建证书私钥
create_key ${output} ${pass}
# 创建证书签名请求
create_csr ${cname} ${output} ${pass}
# 用指定的ca进行签名
ca_sign ${output} ${caname} ${pass}
# 打印信息
check_crt ${output}
# 校验
verify_crt ${output} ${cachain}
# 导出p12证书
export_p12 ${output} ${pass} ${pass}

if [ ${caname} = "ca" ];then
# intermediate证书,需要更新cachain并且放置在最签名,rootca在最底部(注意顺序)
tmpfile="/tmp/$$_tmp_$$"
cat "${output}${crt}" | cat - "${cachain}${crt}" > "${tmpfile}" \
&& mv "${tmpfile}" "${cachain}${crt}"
else
# 业务证书移除密码
remove_key_pwd "${output}" ${pass}
fi
}

# 在这里创建一个root ca和intermediate ca
# init两个ca的环境,置于如何关联到这两个目录,在conf文件中配置
create_conf ca
create_conf imm

# 创建root ca的私钥
create_key ca ${pass}
# 自签名
self_sign root_ca ca ${pass}
check_crt ca
# 更新到cachain
cat "ca${crt}" > "${cachain}${crt}"

# 创建intermediate ca 证书
create_and_sign intermediate intermediate_ca ca
# 创建服务器证书,common name必须和域名匹配
create_and_sign server "ycy.qiujinwu.com" intermediate
# 创建客户证书,用于作客户端认证(双向认证)
create_and_sign kingqiu "kingqiu" intermediate

cnf配置

上述的脚本有两个cnf文件,分别是ca_openssl.cnf和imm_openssl.cnf,原始文件在https://github.com/barretlee/autocreate-ca

wget -O ca_openssl.cnf \
https://raw.githubusercontent.com/barretlee/autocreate-ca/master/cnf/root-ca
wget -O imm_openssl.cnf \
https://raw.githubusercontent.com/barretlee/autocreate-ca/master/cnf/intermediate-ca

两者的区别

king@king:/tmp/c$ diff *.cnf
1,2c1,2
< # OpenSSL root CA configuration file.
< # Copy to `/root/ca/openssl.cnf`.
---
> # OpenSSL intermediate CA configuration file.
> # Copy to `/root/ca/intermediate/openssl.cnf`.
10c10
< dir = /root/ca
---
> dir = /root/ca/intermediate
19,20c19,20
< private_key = $dir/private/ca.key.pem
< certificate = $dir/certs/ca.cert.pem
---
> private_key = $dir/private/intermediate.key.pem
> certificate = $dir/certs/intermediate.cert.pem
24c24
< crl = $dir/crl/ca.crl.pem
---
> crl = $dir/crl/intermediate.crl.pem
35c35
< policy = policy_strict
---
> policy = policy_loose

这些配置和运行上述脚本的路径,以及文件名相关,我做了如下修改

king@king:/tmp/b$ diff ca_openssl.cnf imm_openssl.cnf 
1,2c1,2
< # OpenSSL root CA configuration file.
< # Copy to `/root/ca/openssl.cnf`.
---
> # OpenSSL intermediate CA configuration file.
> # Copy to `/root/ca/intermediate/openssl.cnf`.
10c10
< dir = /tmp/b/ca
---
> dir = /tmp/b/imm
19,20c19,20
< private_key = $dir/../ca.key
< certificate = $dir/../ca.crt
---
> private_key = $dir/../intermediate.key
> certificate = $dir/../intermediate.crt
24c24
< crl = $dir/ca.crl.pem
---
> crl = $dir/intermediate.crl.pem
35c35
< policy = policy_strict
---
> policy = policy_loose

注意:

  1. CA_default/dir的路径和证书的路径匹配
  2. private_key、certificate和crl的路径正确
[ CA_default ]
# Directory and file locations.
dir = /tmp/b/ca
certs = $dir/certs
crl_dir = $dir/crl
new_certs_dir = $dir/newcerts
database = $dir/index.txt
serial = $dir/serial
RANDFILE = $dir/.rand

# The root key and root certificate.
private_key = $dir/../ca.key
certificate = $dir/../ca.crt

# For certificate revocation lists.
crlnumber = $dir/crlnumber
crl = $dir/ca.crl.pem
crl_extensions = crl_ext
default_crl_days = 30

运行之后,生成如下文件

.
├── ca
│ ├── certs
│ ├── crl
│ ├── csr
│ ├── index.txt
│ ├── index.txt.attr
│ ├── index.txt.old
│ ├── newcerts
│ ├── private
│ ├── serial
│ └── serial.old
├── cachain.crt
├── ca.crt
├── ca.key
├── ca_openssl.cnf
├── imm
│ ├── certs
│ ├── crl
│ ├── csr
│ ├── index.txt
│ ├── index.txt.attr
│ ├── index.txt.attr.old
│ ├── index.txt.old
│ ├── newcerts
│ ├── private
│ ├── serial
│ └── serial.old
├── imm_openssl.cnf
├── intermediate.crt
├── intermediate.csr
├── intermediate.key
├── intermediate.p12
├── kingqiu.crt
├── kingqiu.csr
├── kingqiu.key
├── kingqiu.p12
├── server.crt
├── server.csr
├── server.key
└── server.p12

配置Nginx服务器证书

server {
listen 443 ssl;
root /var/www/html;
ssl on;
ssl_certificate /tmp/b/server.crt;
ssl_certificate_key /tmp/b/server.key;
}

添加Host之后即可访问https://ycy.qiujinwu.com/index.nginx-debian.html,由于是自建ca,浏览器是不信任的

Linux下添加信任

chrome和opera打开的界面都是一样的,并且数据互通。打开【证书管理器】之后,打开tab【授权中心】,选择【cachain.crt】,就一次性将root ca和intermediate ca一并导入了(必须都导入,否则无法通过信任,业务证书也无法看到完整的证书链),

Windows下添加信任

双击ca.crt和intermediate.crt依次安装到【受信任的根证书颁发机构】

再次刷新,即可看到锁变绿了,chrome有时需要关闭重新打开才能信任。

双向认证

注意nginx客户端的ca证书,使用的是cachain.crt证书链

server {
listen 443 ssl;
root /var/www/html;
ssl on;
ssl_certificate /tmp/b/server.crt;
ssl_certificate_key /tmp/b/server.key;

ssl_verify_client on;
ssl_verify_depth 2;
ssl_client_certificate /tmp/b/cachain.crt;
proxy_set_header X-SSL-Client-Cert $ssl_client_cert;
}

Linux下,打开【证书管理器】之后,打开tab【您的证书】,选择【kingqiu.p12】,输入密码,导入证书

Windows下,直接双击kingqiu.p12导入到默认的位置即可。

接下来再次访问就会弹出对话框选择证书,若无匹配的证书,就报错【400 Bad Request。No required SSL certificate was sent】

典型的问题

Linux下chrome无法导入intermediate.crt,ca.crt无问题,换成opera正常。chrome提示【证书授权中心导入错误-无法解析文件】

提示O/S等不匹配

Using configuration from /etc/pki/tls/openssl.cnf
Check that the request matches the signature
Signature ok
The countryName field needed to be the same in the
CA certificate (NL) and the request (RO)

king@king:/tmp/a$ sudo vi /etc/ssl/openssl.cnf

# For the CA policy
[ policy_match ]
countryName = optional
stateOrProvinceName = optional
organizationName = optional
#countryName = match
#stateOrProvinceName = match
#organizationName = match
organizationalUnitName = optional
commonName = supplied
emailAddress = optional

无法作为ca加入信任的授权中心

这通常是v3 extension的CA flag未启用或者不存在

king@king:/tmp/c$ openssl x509 -noout -text -in intermediate.crt 
Certificate:
Data:
Version: 3 (0x2)
Serial Number: 4096 (0x1000)
Signature Algorithm: sha256WithRSAEncryption
Issuer: C=CN, ST=fasdf, OU=R&D, CN=kingqiu
Validity
Not Before: Jul 14 10:00:14 2017 GMT
Not After : May 23 10:00:14 2027 GMT
Subject: C=CN, ST=afdasf, OU=R&D, CN=intermediate_ca
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
Public-Key: (2048 bit)
Modulus:
00:c0:81:d1:fb:2b:b8:0f:b6:8b:30:04:46:d0:78:
28:8d
Exponent: 65537 (0x10001)
X509v3 extensions:
X509v3 Subject Key Identifier:
0D:6A:86:F1:62:AE:6A:74:05:12:31:09:BD:25:3B:B6:4E:98:BF:8E
X509v3 Authority Key Identifier:
keyid:E1:79:75:26:9D:C9:B4:FF:03:77:F9:5D:2F:BD:CB:C7:C6:DE:2A:20

X509v3 Basic Constraints: critical
# 注意这里
CA:TRUE, pathlen:0
X509v3 Key Usage: critical
Digital Signature, Certificate Sign, CRL Sign
Signature Algorithm: sha256WithRSAEncryption
ce:79:c0:61:5c:39:66:f2:cf:30:34:ee:8b:7c:e1:f5:24:53:
1e:d7:cf:ed

证书和签名学习汇总

证书相关的玩意,各种文件格式一大堆,大体可以分为:

  1. 公私钥文件(.key …)
  2. 证书文件(.pem …)
  3. 证书签名申请文件(.csr …)

具体的格式又分成两种

  1. PEM - Privacy Enhanced Mail,打开看文本格式,以”—–BEGIN…”开头, “—–END…”结尾,内容是BASE64编码
  2. DER - Distinguished Encoding Rules,打开看是二进制格式,不可读.

参考http://www.cnblogs.com/guogangj/p/4118605.html

查看PEM格式证书的信息:openssl x509 -in certificate.pem -text -noout
Apache和*NIX服务器偏向于使用这种编码格式.

查看DER格式证书的信息:openssl x509 -in certificate.der -inform der -text -noout
Java和Windows服务器偏向于使用这种编码格式.

关于DER格式,使用了偏门的编码ASN.1,参见这里

常用格式

key/pub

公钥私钥文件
下面的代码生成私钥

# 后面的数字可以成比率缩放
openssl genrsa -out server.key 2048

实际上生成的key文件包含了公钥和私钥,下面的命令导出公钥

openssl rsa -in server.key -pubout > server.pub

king@king:/tmp/a$ cat server.pub
-----BEGIN PUBLIC KEY-----
king@king:/tmp/a$ cat server.key
-----BEGIN RSA PRIVATE KEY-----

前面提到,pem格式都是已BEGIN开头,那么中间又区分若干类型,关于公私钥,有

  1. —–BEGIN RSA PUBLIC KEY—– #PKCS#1
  2. —–BEGIN RSA PRIVATE KEY—– #PKCS#1
  3. —–BEGIN PUBLIC KEY—– #PKCS#8
  4. —–BEGIN PRIVATE KEY—– #PKCS#8
  5. —–BEGIN ENCRYPTED PRIVATE KEY—– #encrypted PKCS#8

参见这里,具体而言,设计到PKCS#8 PKCS#1规范的差异

BEGIN RSA PRIVATE KEY is PKCS#1 and is just an RSA key. It is essentially just the key object from PKCS#8, but without the version or algorithm identifier in front. BEGIN PRIVATE KEY is PKCS#8 and indicates that the key type is included in the key data itself. From the link:

PKCS#8 PKCS#1转换

# 转pkcs8
openssl pkcs8 -topk8 -inform PEM -in server.key -outform pem -nocrypt -out server8.key
# 导出公钥
openssl rsa -in server8.key -pubout > server8.pub
# 转pkcs1
openssl rsa -in server8.key -out server1.key
king@king:/tmp/a$ cat server8.key
-----BEGIN PRIVATE KEY-----
king@king:/tmp/a$ cat server8.pub
-----BEGIN PUBLIC KEY-----
king@king:/tmp/a$ cat server1.key
-----BEGIN RSA PRIVATE KEY-----

参考这里

关于PKCS,参考

  1. https://zh.wikipedia.org/wiki/%E5%85%AC%E9%92%A5%E5%AF%86%E7%A0%81%E5%AD%A6%E6%A0%87%E5%87%86
  2. http://falchion.iteye.com/blog/1472453

查看Key

openssl rsa -in server8.key -text -noout
openssl rsa -in server1.key -text -noout
# Der格式
openssl rsa -in mykey.key -text -noout -inform der

其他

  1. pem 证书
  2. crt certificate的三个字母 常见于*NIX系统,有可能是PEM编码,也有可能是DER编码,大多数应该是PEM编码
  3. cer 还是certificate,常见于Windows系统,同样的,可能是PEM编码,也可能是DER编码,大多数应该是DER编码.
  4. csr ,即证书签名请求,这个并不是证书,而是向权威证书颁发机构获得签名证书的申请,其核心内容是一个公钥(当然还附带了一些别的信息),在生成这个申请的时候,同时也会生成一个私钥,私钥要自己保管好.
  5. pfx/p12 - predecessor of PKCS#12,对*nix服务器来说,一般CRT和KEY是分开存放在不同文件中的,但Windows的IIS则将它们存在一个PFX文件中,(因此这个文件包含了证书及私钥)这样会不会不安全?应该不会,PFX通常会有一个”提取密码”,你想把里面的东西读取出来的话,它就要求你提供提取密码
  6. jks 即Java Key Storage,这是Java的专利,跟OpenSSL关系不大,利用Java的一个叫”keytool”的工具,可以将PFX转为JKS

参考这里

证书编码的转换

  1. PEM转为DER

    openssl x509 -in cert.crt -outform der -out cert.der
  2. DER转为PEM

    openssl x509 -in cert.crt -inform der -outform pem -out cert.pem

(提示:要转换KEY文件也类似,只不过把x509换成rsa,要转CSR的话,把x509换成req…)

TLS

# 生成服务器端的私钥
openssl genrsa -out server.key 2048
# 生成服务器端证书
openssl req -new -x509 -key server.key -out server.pem -days 3650

查看证书

openssl x509 -noout -text -in server.pem

证书内容

X.509 数字证书标准,定义证书文件的结构和内容,详情参考 RFC5280,一般由用户公共密钥和用户标识符组成,此外还包括版本号、证书序列号、CA 标识符、签名算法标识、签发者名称、证书有效期等信息。

Golang TLS

  1. 客户端向服务器端索要并验证公钥。
  2. 双方协商生成”对话密钥”。
  3. 双方采用”对话密钥”进行加密通信。

Golang基础库对tls支持非常好,Go Package tls部分实现了 tls 1.2的功能,可以满足我们日常的应用。Package crypto/x509提供了证书管理的相关操作。参考http://colobu.com/2016/06/07/simple-golang-tls-examples/

server.go

package main
import (
"bufio"
"crypto/tls"
"log"
"net"
)
func main() {
cert, err := tls.LoadX509KeyPair("server.pem", "server.key")
if err != nil {
log.Println(err)
return
}
config := &tls.Config{Certificates: []tls.Certificate{cert}}
ln, err := tls.Listen("tcp", ":4433", config)
if err != nil {
log.Println(err)
return
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
continue
}
go handleConn(conn)
}
}
func handleConn(conn net.Conn) {
defer conn.Close()
r := bufio.NewReader(conn)
for {
msg, err := r.ReadString('\n')
if err != nil {
log.Println(err)
return
}
println(msg)
n, err := conn.Write([]byte("world\n"))
if err != nil {
log.Println(n, err)
return
}
}
}

cli.go

package main
import (
"crypto/tls"
"log"
)
func main() {
conf := &tls.Config{
InsecureSkipVerify: true,
}
conn, err := tls.Dial("tcp", "127.0.0.1:4433", conf)
if err != nil {
log.Println(err)
return
}
defer conn.Close()
n, err := conn.Write([]byte("hello\n"))
if err != nil {
log.Println(n, err)
return
}
buf := make([]byte, 100)
n, err = conn.Read(buf)
if err != nil {
log.Println(n, err)
return
}
println(string(buf[:n]))
}

InsecureSkipVerify用来控制客户端是否证书和服务器主机名。如果设置为true,则不会校验证书以及证书中的主机名和服务器主机名是否一致。

验证客户端身份

在有的情况下,需要双向认证,服务器也需要验证客户端的真实性。在这种情况下,我们需要服务器和客户端进行一点额外的配置。

package main
import (
"bufio"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"net"
)
func main() {
cert, err := tls.LoadX509KeyPair("server.pem", "server.key")
if err != nil {
log.Println(err)
return
}
certBytes, err := ioutil.ReadFile("client.pem")
if err != nil {
panic("Unable to read cert.pem")
}
clientCertPool := x509.NewCertPool()
ok := clientCertPool.AppendCertsFromPEM(certBytes)
if !ok {
panic("failed to parse root certificate")
}
config := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: clientCertPool,
}
ln, err := tls.Listen("tcp", ":443", config)
if err != nil {
log.Println(err)
return
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
continue
}
go handleConn(conn)
}
}
func handleConn(conn net.Conn) {
defer conn.Close()
r := bufio.NewReader(conn)
for {
msg, err := r.ReadString('\n')
if err != nil {
log.Println(err)
return
}
println(msg)
n, err := conn.Write([]byte("world\n"))
if err != nil {
log.Println(n, err)
return
}
}
}

client.go

package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
)
func main() {
cert, err := tls.LoadX509KeyPair("client.pem", "client.key")
if err != nil {
log.Println(err)
return
}
certBytes, err := ioutil.ReadFile("client.pem")
if err != nil {
panic("Unable to read cert.pem")
}
clientCertPool := x509.NewCertPool()
ok := clientCertPool.AppendCertsFromPEM(certBytes)
if !ok {
panic("failed to parse root certificate")
}
conf := &tls.Config{
RootCAs: clientCertPool,
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
}
conn, err := tls.Dial("tcp", "127.0.0.1:443", conf)
if err != nil {
log.Println(err)
return
}
defer conn.Close()
n, err := conn.Write([]byte("hello\n"))
if err != nil {
log.Println(n, err)
return
}
buf := make([]byte, 100)
n, err = conn.Read(buf)
if err != nil {
log.Println(n, err)
return
}
println(string(buf[:n]))
}

证书

参考

  1. http://www.cnblogs.com/JeffreySun/archive/2010/06/24/1627247.html
  2. http://www.barretlee.com/blog/2016/04/24/detail-about-ca-and-certs/
  3. http://www.ruanyifeng.com/blog/2011/08/what_is_a_digital_signature.html
  4. https://en.wikipedia.org/wiki/Public_key_certificate

证书部分典型字段

  1. 证书的发布机构
  2. 证书的有效期
  3. 公钥
  4. 证书所有者(Subject)
  5. 签名所使用的算法
  6. 指纹以及指纹算法

◆Issuer (证书的发布机构)

指出是什么机构发布的这个证书,也就是指明这个证书是哪个公司创建的(只是创建证书,不是指证书的使用者)。例如”SecureTrust CA”

◆Valid from , Valid to (证书的有效期)

也就是证书的有效时间,或者说证书的使用期限。 过了有效期限,证书就会作废,不能使用了。

◆Public key (公钥)

这个我们在前面介绍公钥密码体制时介绍过,公钥是用来对消息进行加密的,第2章的例子中经常用到的。这个数字证书的公钥是2048位的,它的值可以在图的中间的那个对话框中看得到,是很长的一串数字。

◆Subject (主题)

这个证书是发布给谁的,或者说证书的所有者,一般是某个人或者某个公司名称、机构的名称、公司网站的网址等。 对于这里的证书来说,证书的所有者是Trustwave这个公司。

◆Signature algorithm (签名所使用的算法)

就是指的这个数字证书的数字签名所使用的加密算法,这样就可以使用证书发布机构的证书里面的公钥,根据这个算法对指纹进行解密。指纹的加密结果就是数字签名(第1.5节中解释过数字签名)。

◆Thumbprint, Thumbprint algorithm (指纹以及指纹算法)

这个是用来保证证书的完整性的,也就是说确保证书没有被修改过,这东西的作用和2.7中说到的第3个问题类似。 其原理就是在发布证书时,发布者根据指纹算法(一个hash算法)计算整个证书的hash值(指纹)并和证书放在一起,使用者在打开证书时,自己也根据指纹算法计算一下证书的hash值(指纹),如果和刚开始的值对得上,就说明证书没有被修改过,因为证书的内容被修改后,根据证书的内容计算的出的hash值(指纹)是会变化的。 注意,这个指纹会使用”SecureTrust CA”这个证书机构的私钥用签名算法(Signature algorithm)加密后和证书放在一起。

就是指的这个数字证书的数字签名所使用的加密算法,这样就可以使用证书发布机构的证书里面的公钥,根据这个算法对指纹进行解密。指纹的加密结果就是数字签名(第1.5节中解释过数字签名)。

如何向证书的发布机构去申请证书

举个例子方便大家理解,假设我们公司”ABC Company”花了1000块钱,向一个证书发布机构”SecureTrust CA”为我们自己的公司”ABC Company”申请了一张证书,注意,这个证书发布机构”SecureTrust CA”是一个大家公认并被一些权威机构接受的证书发布机构,我们的操作系统里面已经安装了”SecureTrust CA”的证书。”SecureTrust CA”在给我们发布证书时,把Issuer,Public key,Subject,Valid from,Valid to等信息以明文的形式写到证书里面,然后用一个指纹算法计算出这些数字证书内容的一个指纹,并把指纹和指纹算法用自己的私钥进行加密,然后和证书的内容一起发布。

证书的吊销

CA 证书的吊销存在两种机制,一种是在线检查,client 端向 CA 机构发送请求检查 server 公钥的靠谱性;第二种是 client 端储存一份 CA 提供的证书吊销列表,定期更新。前者要求查询服务器具备良好性能,后者要求每次更新提供下次更新的时间,一般时差在几天。安全性要求高的网站建议采用第一种方案。

大部分 CA 并不会提供吊销机制(CRL/OCSP),靠谱的方案是为根证书提供中间证书,一旦中间证书的私钥泄漏或者证书过期,可以直接吊销中间证书并给用户颁发新的证书。中间证书的签证原理于上上条提到的原理一样,中间证书还可以产生下一级中间证书,多级证书可以减少根证书的管理负担。

证书链

除了end-user之外,证书被分为root Certificates和intermediates Certificates。相应地,CA也分了两种类型:root CAs 和 intermediates CAs。首先,CA的组织结构是一个树结构,一个root CAs下面包含多个intermediates CAs,而intermediates又可以包含多个intermediates CAs。root CAs 和 intermediates CAs都可以颁发证书给用户,颁发的证书分别是root Certificates和intermediates Certificates,最终用户用来认证公钥的证书则被称为end-user Certificates。

我们使用end-user certificates来确保加密传输数据的公钥(public key)不被篡改,而又如何确保end-user certificates的合法性呢?这个认证过程跟公钥的认证过程类似,首先获取颁布end-user certificates的CA的证书,然后验证end-user certificates的signature。一般来说,root CAs不会直接颁布end-user certificates的,而是授权给多个二级CA,而二级CA又可以授权给多个三级CA,这些中间的CA就是intermediates CAs,它们才会颁布end-user certificates。

但是intermediates certificates的可靠性又如何保证呢?这就是涉及到证书链,Certificate Chain ,链式向上验证证书,直到Root Certificates

那Root Certificates又是如何来的呢?根据 https://support.dnsimple.com/articles/what-is-ssl-certificate-chain/ 这篇文章的说法,除了可以下载安装之外,device(例如浏览器,操作系统)都会内置一些root certificates,称之为trusted root certificates,https://support.apple.com/en-us/HT202858 ,在Apple的官网上可以看到这个列表,有各个操作版本直接内置的Root Certificates。

最后一个问题,为什么需要证书链这么麻烦的流程?Root CA为什么不直接版本证书,而是要搞那么多中间层级呢?找了一下,godaddy官方给了一个答案,为了确保root certificates的绝对安全性,https://sg.godaddy.com/en/help/what-is-an-intermediate-certificate-868 ,将根证书隔离地越严格越好。

证书请求(certificate sign request)

openssl genrsa -out ssl.key 2048
openssl req -new -key ssl.key -out ssl.csr

证书(certificate)和证书请求(certificate sign request)

  1. 证书是自签名或CA签名过的凭据,用来进行身份认证
  2. 证书请求是对签名的请求,需要使用私钥进行签名

x509命令可以将证书和证书请求相互转换

自签名证书

自签名的原理是用私钥对该私钥生成的证书请求进行签名,生成证书文件。该证书的签发者就是自己,所以验证方必须有该证书的私钥才能对签发信息进行验证,所以要么验证方信任一切证书,面临冒名顶替的风险,要么被验证方的私钥(或加密过的私钥)需要发送到验证方手中,面临私钥泄露的风险。

king@king:/tmp/a$ openssl rsa -in ssl.key -des3 -out encrypted.key
king@king:/tmp/a$ openssl req -new -key ssl.key -out ssl.csr
king@king:/tmp/a$ openssl x509 -req -in ssl.csr -signkey ssl.key -out ssl.crt

Exe签名

导出p12格式的证书,易被Windows下的工具支持

openssl pkcs12 -export -clcerts -in ssl.crt -inkey ssl.key -out ssl.p12
# 用Windows sdk的工具signtool实现exe文件签名
signtool sign /f ssl.pfx /p mypassword abc.exe

Https

  1. 服务器把证书信息发送给浏览器
  2. 浏览器现在开始验证证书的合法性,证书验证之后,随机一个对称加密的秘钥,并加密一段握手信息,并用公钥加密秘钥之后发给服务器
  3. 服务器用私钥解密之后得到秘钥,解密密文验证无误后,加密一段握手信息发送给浏览器
  4. 浏览器验证密文无误,开始SSL安全连接

问题

  1. 直接用rsa加密,效率太低
  2. 直接用rsa加密,回程无安全性

ketama算法Golang实现

源码 https://github.com/serialx/hashring

基础数据结构

type HashKey uint32

// 排序支持
type HashKeyOrder []HashKey
func (h HashKeyOrder) Len() int { return len(h) }
func (h HashKeyOrder) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h HashKeyOrder) Less(i, j int) bool { return h[i] < h[j] }

type HashRing struct {
// 一致性hash 虚拟node,以及关联的节点
ring map[HashKey]string
// 所有的虚拟node,(重新)生成之后,会排序,以便快速二分查找
// 通过二分查找,找到比查询key小的最大key,然后查找ring
sortedKeys []HashKey
// 原始数据,所有的节点
nodes []string
// 原始数据,所有的节点权重
weights map[string]int
}

初始化

func New(nodes []string) *HashRing {
// 声明一个对象
hashRing := &HashRing{
ring: make(map[HashKey]string),
sortedKeys: make([]HashKey, 0),
nodes: nodes,
weights: make(map[string]int),
}
// 生成虚拟node
hashRing.generateCircle()
return hashRing
}
// 和上个函数比,增加权重
func NewWithWeights(weights map[string]int) *HashRing {
nodes := make([]string, 0, len(weights))
for node, _ := range weights {
nodes = append(nodes, node)
}
hashRing := &HashRing{
ring: make(map[HashKey]string),
sortedKeys: make([]HashKey, 0),
nodes: nodes,
weights: weights,
}
hashRing.generateCircle()
return hashRing
}

生成虚拟node

核心逻辑

func (h *HashRing) generateCircle() {
// 获取总的权重
totalWeight := 0
for _, node := range h.nodes {
if weight, ok := h.weights[node]; ok {
totalWeight += weight
} else {
totalWeight += 1
}
}

// 依次对每个节点生成若干虚拟node
for _, node := range h.nodes {
weight := 1

// 是否指定了权重
if _, ok := h.weights[node]; ok {
weight = h.weights[node]
}

// 设置虚拟node的数量,40这个乘数因子可调整
// 数量越大,分布越均匀,但是性能相对较低
factor := math.Floor(float64(40*len(h.nodes)*weight) / float64(totalWeight))

// 依次生成虚拟node(里面还会细分成多个,实现细节问题)
for j := 0; j < int(factor); j++ {
// 生成一个key(受factor影响)
nodeKey := fmt.Sprintf("%s-%d", node, j)
// 生成MD5(16大小的[]byte)
bKey := hashDigest(nodeKey)
// 原代码只计算了前面12Byte,可以用完16byte,生成4个虚拟node
for i := 0; i < 4; i++ {
// 生成一个虚拟node的key
key := hashVal(bKey[i*4 : i*4+4])
// 关联后端的节点
h.ring[key] = node
// 插入到查找的slice
h.sortedKeys = append(h.sortedKeys, key)
}
}
}

// 排序,方便二分查找
sort.Sort(HashKeyOrder(h.sortedKeys))
}

查找

func (h *HashRing) GetNode(stringKey string) (node string, ok bool) {
// 查找key所在的虚拟node
pos, ok := h.GetNodePos(stringKey)
if !ok {
return "", false
}
// 返回后端节点
return h.ring[h.sortedKeys[pos]], true
}

func (h *HashRing) GenKey(key string) HashKey {
// 查询后端节点时,给key生成hash
bKey := hashDigest(key)
//MD5取前四位,比较简单
return hashVal(bKey[0:4])
}

func (h *HashRing) GetNodePos(stringKey string) (pos int, ok bool) {
// 无数据
if len(h.ring) == 0 {
return 0, false
}

// 生成
key := h.GenKey(stringKey)

nodes := h.sortedKeys
// 二分查找key
pos = sort.Search(len(nodes), func(i int) bool { return nodes[i] > key })

// 找不到就第一个
if pos == len(nodes) {
// Wrap the search, should return first node
return 0, true
} else {
// 否则返回找到的node
return pos, true
}
}

复制

为了保证数据可靠性,可以将每份数据都写入多个后端节点,一个小问题是

  1. 不同的key所在的后端节点有交叉
// size表示返回的副本数量
func (h *HashRing) GetNodes(stringKey string, size int) (nodes []string, ok bool) {
// 获得key所在的虚拟node
pos, ok := h.GetNodePos(stringKey)
if !ok {
return []string{}, false
}

// 没那么多节点
if size > len(h.nodes) {
return []string{}, false
}

// 由于节点关联的虚拟node交叉放置,下面的map用于判断是否已经get了这个节点
returnedValues := make(map[string]bool, size)
//mergedSortedKeys := append(h.sortedKeys[pos:], h.sortedKeys[:pos]...)
// 已经get的节点
resultSlice := make([]string, 0, size)

// 从key所在的虚拟node开始查找,到尾自动从第一个开始
for i := pos; i < pos+len(h.sortedKeys); i++ {
key := h.sortedKeys[i%len(h.sortedKeys)]
// 检查这个后端节点是否已经保存
val := h.ring[key]
if !returnedValues[val] {
// 没有就保存起来,并且标记已保存
returnedValues[val] = true
resultSlice = append(resultSlice, val)
}
if len(returnedValues) == size {
break
}
}

return resultSlice, len(resultSlice) == size
}

增删改节点

目前的实现都是先更新原始数据,然后重新生成虚拟node,个人认为优化的地方

  1. 由于每次新增一个都需要重新生成,若要更新多个会比较慢,可以增加批量接口
  2. 重新生成时,不知道变更的数据,若需要数据迁移,需要额外处理
func (h *HashRing) AddNode(node string) *HashRing {
return h.AddWeightedNode(node, 1)
}

func (h *HashRing) AddWeightedNode(node string, weight int) *HashRing {
if weight <= 0 {
return h
}

// 存在就不处理
for _, eNode := range h.nodes {
if eNode == node {
return h
}
}

// 加入到原始数据
nodes := make([]string, len(h.nodes), len(h.nodes)+1)
copy(nodes, h.nodes)
nodes = append(nodes, node)

weights := make(map[string]int)
for eNode, eWeight := range h.weights {
weights[eNode] = eWeight
}
weights[node] = weight

hashRing := &HashRing{
ring: make(map[HashKey]string),
sortedKeys: make([]HashKey, 0),
nodes: nodes,
weights: weights,
}
// 重新生成
hashRing.generateCircle()
return hashRing
}

Ngrok客户端源码学习笔记

关于ngrok的使用,参考http://blog.qiujinwu.com/2017/02/13/ngrok-reverse-proxy/

源码地址 https://github.com/inconshreveable/ngrok,我fork一份在https://github.com/qjw/ngrok,代码相对路径src/github.com/qjw/ngrok/src/ngrok/server/

main入口在src/github.com/qjw/ngrok/src/ngrok/main,分成客户端和服务端/ngrokd/ngrokd.go、/ngrok/ngrok.go。

.
├── cli.go #命令行相关
├── config.go #配置相关
├── controller.go # 控制器,用于管理view/网络/config/state等
├── main.go #入口
├── metrics.go #统计数据
├── model.go # 核心逻辑
├── mvc
│   ├── controller.go # controller interface
│   ├── model.go # interface
│   ├── state.go # interface和状态定义
│   └── view.go # interface
├── tls.go #tls加密
└── views #view
├── term #终端view
│   ├── area.go
│   ├── http.go
│   └── view.go
└── web #web view
├── http.go
└── view.go

入口

func Main() {
NewController().Run(config)
}
func NewController() *Controller {
ctl := &Controller{
Logger: log.NewPrefixLogger("controller"),
updates: util.NewBroadcast(),
cmds: make(chan command),
views: make([]mvc.View, 0),
state: make(chan mvc.State),
}

return ctl
}

执行启动逻辑

func (ctl *Controller) Run(config *Configuration) {
// Save the configuration
ctl.config = config

var model *ClientModel

// 创建model(核心逻辑所在)
if ctl.model == nil {
// 不存在就创建
model = ctl.SetupModel(config)
} else {
model = ctl.model.(*ClientModel)
}

// init the model
var state mvc.State = model

// 初始化web view
var webView *web.WebView
if config.InspectAddr != "disabled" {
webView = web.NewWebView(ctl, config.InspectAddr)
ctl.AddView(webView)
}

// 初始化term view
var termView *term.TermView
if config.LogTo != "stdout" {
termView = term.NewTermView(ctl)
ctl.AddView(termView)
}

// 将view关联到controller
for _, protocol := range model.GetProtocols() {
switch p := protocol.(type) {
case *proto.Http:
if termView != nil {
ctl.AddView(termView.NewHttpView(p))
}

if webView != nil {
ctl.AddView(webView.NewHttpView(p))
}
default:
}
}

// 核心逻辑入口
ctl.Go(ctl.model.Run)

done := make(chan int)
for {
select {
case obj := <-ctl.cmds:
// 关闭command
switch cmd := obj.(type) {
case cmdQuit:
msg := cmd.message
go func() {
// 等待退出
ctl.doShutdown()
fmt.Println(msg)
done <- 1
}()

// 回放命令
case cmdPlayRequest:
ctl.Go(func() { ctl.model.PlayRequest(cmd.tunnel, cmd.payload) })
}

// 更新state
case obj := <-updates:
state = obj.(mvc.State)

case ctl.state <- state:
// 退出
case <-done:
return
}
}
}

最终进入一个死循环,似乎作者并没有打算让它和谐地退出

func (c *ClientModel) Run() {
// how long we should wait before we reconnect
maxWait := 30 * time.Second
wait := 1 * time.Second

for {
// 开始发起连接请求,响应报文等
// 注意是阻塞的,若该函数返回了,说明掉线了
c.control()

// 失败后,第一次状态未变,此时重置wait的时间间隔
if c.connStatus == mvc.ConnOnline {
wait = 1 * time.Second
}

// sleep,避免无畏的不停请求,浪费服务器资源
time.Sleep(wait)
// 失败了继续加大重试的间隔
wait = 2 * wait
wait = time.Duration(math.Min(float64(wait), float64(maxWait)))
// 设置状态
c.connStatus = mvc.ConnReconnecting
// 刷新各种view
c.update()
}
}

退出

func (ctl *Controller) doShutdown() {
ctl.Info("Shutting down")

var wg sync.WaitGroup

// wait for all of the views, plus the model
wg.Add(len(ctl.views) + 1)

// 关闭所有的view
for _, v := range ctl.views {
vClosure := v
ctl.Go(func() {
vClosure.Shutdown()
wg.Done()
})
}

// 关闭model(核心逻辑)
ctl.Go(func() {
ctl.model.Shutdown()
wg.Done()
})

// 用WaitGroup等待多个goruntune
wg.Wait()
}

主流程

func (c *ClientModel) control() {
// establish control channel
var (
ctlConn conn.Conn
err error
)

// 向服务器发起连接
if c.proxyUrl == "" {
// simple non-proxied case, just connect to the server
ctlConn, err = conn.Dial(c.serverAddr, "ctl", c.tlsConfig)
} else {
ctlConn, err = conn.DialHttpProxy(c.proxyUrl, c.serverAddr, "ctl", c.tlsConfig)
}

// authenticate with the server
auth := &msg.Auth{
ClientId: c.id,
OS: runtime.GOOS,
Arch: runtime.GOARCH,
Version: version.Proto,
MmVersion: version.MajorMinor(),
User: c.authToken,
}

// 发送连接报文
if err = msg.WriteMsg(ctlConn, auth); err != nil {
panic(err)
}

// 等待响应
var authResp msg.AuthResp
if err = msg.ReadMsgInto(ctlConn, &authResp); err != nil {
panic(err)
}

if authResp.Error != "" {
emsg := fmt.Sprintf("Failed to authenticate to server: %s", authResp.Error)
c.ctl.Shutdown(emsg)
return
}

c.id = authResp.ClientId
c.serverVersion = authResp.MmVersion
c.Info("Authenticated with server, client id: %v", c.id)
// 更新view
c.update()
// 更新配置
if err = SaveAuthToken(c.configPath, c.authToken); err != nil {
c.Error("Failed to save auth token: %v", err)
}

// request tunnels
reqIdToTunnelConfig := make(map[string]*TunnelConfiguration)
for _, config := range c.tunnelConfig {
// create the protocol list to ask for
var protocols []string
for proto, _ := range config.Protocols {
protocols = append(protocols, proto)
}

// 注册tunnel
reqTunnel := &msg.ReqTunnel{
ReqId: util.RandId(8),
Protocol: strings.Join(protocols, "+"),
Hostname: config.Hostname,
Subdomain: config.Subdomain,
HttpAuth: config.HttpAuth,
RemotePort: config.RemotePort,
}

// 发送请求
if err = msg.WriteMsg(ctlConn, reqTunnel); err != nil {
panic(err)
}

// 存储这些配置,后面新建tunnel需要
reqIdToTunnelConfig[reqTunnel.ReqId] = config
}

// 开始心跳
lastPong := time.Now().UnixNano()
c.ctl.Go(func() { c.heartbeat(&lastPong, ctlConn) })

// main control loop
for {
var rawMsg msg.Message
if rawMsg, err = msg.ReadMsg(ctlConn); err != nil {
panic(err)
}

switch m := rawMsg.(type) {
case *msg.ReqProxy:
// 收到proxy请求,就向服务器发起一个proxy请求
c.ctl.Go(c.proxy)

case *msg.Pong:
// 更新心跳
atomic.StoreInt64(&lastPong, time.Now().UnixNano())

case *msg.NewTunnel:
// 注册tunnel确认
if m.Error != "" {
emsg := fmt.Sprintf("Server failed to allocate tunnel: %s", m.Error)
c.Error(emsg)
c.ctl.Shutdown(emsg)
continue
}

tunnel := mvc.Tunnel{
PublicUrl: m.Url,
LocalAddr: reqIdToTunnelConfig[m.ReqId].Protocols[m.Protocol],
Protocol: c.protoMap[m.Protocol],
}

// 保存tunnel对象,用于view的数据呈现
// 另外是申请proxy时,作校验
c.tunnels[tunnel.PublicUrl] = tunnel
// 更新状态
c.connStatus = mvc.ConnOnline
// 同步到view
c.update()

default:
ctlConn.Warn("Ignoring unknown control message %v ", m)
}
}
}

心跳

func (c *ClientModel) heartbeat(lastPongAddr *int64, conn conn.Conn) {
lastPing := time.Unix(atomic.LoadInt64(lastPongAddr)-1, 0)
ping := time.NewTicker(pingInterval)
pongCheck := time.NewTicker(time.Second)

for {
select {
// 检查是否过期
case <-pongCheck.C:
lastPong := time.Unix(0, atomic.LoadInt64(lastPongAddr))
needPong := lastPong.Sub(lastPing) < 0
pongLatency := time.Since(lastPing)

if needPong && pongLatency > maxPongLatency {
c.Info("Last ping: %v, Last pong: %v", lastPing, lastPong)
c.Info("Connection stale, haven't gotten PongMsg in %d seconds", int(pongLatency.Seconds()))
return
}

// 定期发送心跳
case <-ping.C:
err := msg.WriteMsg(conn, &msg.Ping{})
if err != nil {
conn.Debug("Got error %v when writing PingMsg", err)
return
}
lastPing = time.Now()
}
}
}

proxy

当收到一个proxy请求时,会新开一个goruntune来处理

func (c *ClientModel) proxy() {
var (
remoteConn conn.Conn
err error
)

// 尝试连接服务器
if c.proxyUrl == "" {
remoteConn, err = conn.Dial(c.serverAddr, "pxy", c.tlsConfig)
} else {
remoteConn, err = conn.DialHttpProxy(c.proxyUrl, c.serverAddr, "pxy", c.tlsConfig)
}
defer remoteConn.Close()

// 发送响应报文
err = msg.WriteMsg(remoteConn, &msg.RegProxy{ClientId: c.id})
if err != nil {
remoteConn.Error("Failed to write RegProxy: %v", err)
return
}

// 等待具体的业务报文,
// 之所以需要这么个报文,是因为需要这个报文中的内容来定位tunnel,进而获取本地端的参数
// 因为客户端也是一个数据中转,它一样有下游节点
var startPxy msg.StartProxy
if err = msg.ReadMsgInto(remoteConn, &startPxy); err != nil {
remoteConn.Error("Server failed to write StartProxy: %v", err)
return
}

// 找到tunnel
tunnel, ok := c.tunnels[startPxy.Url]
if !ok {
remoteConn.Error("Couldn't find tunnel for proxy: %s", startPxy.Url)
return
}

// 连接本地端
start := time.Now()
localConn, err := conn.Dial(tunnel.LocalAddr, "prv", nil)
if err != nil {
remoteConn.Warn("Failed to open private leg %s: %v", tunnel.LocalAddr, err)

if tunnel.Protocol.GetName() == "http" {
// ...
}
return
}
defer localConn.Close()

m := c.metrics
m.proxySetupTimer.Update(time.Since(start))
m.connMeter.Mark(1)
// 更新view
c.update()
m.connTimer.Time(func() {
localConn := tunnel.Protocol.WrapConn(localConn, \
mvc.ConnectionContext{Tunnel: tunnel, ClientAddr: startPxy.ClientAddr})
// 数据交换
bytesIn, bytesOut := conn.Join(localConn, remoteConn)
m.bytesIn.Update(bytesIn)
m.bytesOut.Update(bytesOut)
m.bytesInCount.Inc(bytesIn)
m.bytesOutCount.Inc(bytesOut)
})
c.update()
}

一个典型的proxy开始报文如下

{"Type":"StartProxy","Payload":{"Url":"http://qjw.ngrok.com:10080","ClientAddr":"127.0.0.1:52630"}}

广播

ngrok可以同时支持若干view,为了实时同步状态等数据,

type Controller struct {
// the model sends updates through this broadcast channel
updates *util.Broadcast
}

func NewBroadcast() *Broadcast {
b := &Broadcast{
listeners: make([]chan interface{}, 0),
reg: make(chan (chan interface{})),
unreg: make(chan (chan interface{})),
in: make(chan interface{}),
}

go func() {
for {
select {
// 取消注册
case l := <-b.unreg:
// remove L from b.listeners
// this operation is slow: O(n) but not used frequently
// unlike iterating over listeners
oldListeners := b.listeners
b.listeners = make([]chan interface{}, 0, len(oldListeners))
// 这个删除操作很蛋疼
for _, oldL := range oldListeners {
if l != oldL {
b.listeners = append(b.listeners, oldL)
}
}
// 注册操作
case l := <-b.reg:
b.listeners = append(b.listeners, l)
// 刷新操作
case item := <-b.in:
for _, l := range b.listeners {
l <- item
}
}
}
}()

return b
}

// 对外的刷新channel
func (b *Broadcast) In() chan interface{} {
return b.in
}

// 生成一个注册channel,用于注册
func (b *Broadcast) Reg() chan interface{} {
listener := make(chan interface{})
b.reg <- listener
return listener
}

// 取消注册一个channel
func (b *Broadcast) UnReg(listener chan interface{}) {
b.unreg <- listener
}

使用

func (ctl *Controller) Update(state mvc.State) {
ctl.updates.In() <- state
}
func NewWebView(ctl mvc.Controller, addr string) *WebView {
// handle web socket connections
http.HandleFunc("/_ws", func(w http.ResponseWriter, r *http.Request) {
// 注册一个channel
msgs := wv.wsMessages.Reg()
// 退出是取消注册
defer wv.wsMessages.UnReg(msgs)
// 监听这个channel
for m := range msgs {
err := conn.WriteMessage(websocket.TextMessage, m.([]byte))
if err != nil {
// connection is closed
break
}
}
})
return wv
}

拦截Http Request/Response

为了支持view呈现,需要拦截获取经过客户端的所有req/resp

Protocol

每种tunnel 会有一种protocol,代码定义在src/github.com/qjw/ngrok/src/ngrok/proto/

在初始化view时,会设置proto到view对象中

for _, protocol := range model.GetProtocols() {
switch p := protocol.(type) {
case *proto.Http:
if termView != nil {
ctl.AddView(termView.NewHttpView(p))
}

if webView != nil {
ctl.AddView(webView.NewHttpView(p))
}
default:
}
}

func (v *TermView) NewHttpView(p *proto.Http) *HttpView {
return newTermHttpView(v.ctl, v, p, 0, 12)
}
func (wv *WebView) NewHttpView(proto *proto.Http) *WebHttpView {
return newWebHttpView(wv.ctl, wv, proto)
}

刷新

以webview为例

func newWebHttpView(ctl mvc.Controller, wv *WebView, proto *proto.Http) *WebHttpView {
whv := &WebHttpView{
Logger: log.NewPrefixLogger("view", "web", "http"),
webview: wv,
ctl: ctl,
httpProto: proto,
idToTxn: make(map[string]*SerializedTxn),
HttpRequests: util.NewRing(20),
}
// 实时刷新
ctl.Go(whv.updateHttp)
whv.register()
return whv
}

func (whv *WebHttpView) updateHttp() {
// open channels for incoming http state changes
// and broadcasts
txnUpdates := whv.httpProto.Txns.Reg()
// 监听whv.httpProto.Txns
for txn := range txnUpdates {
// 获得实际的对象
htxn := txn.(*proto.HttpTxn)

// 。。。 刷新操作
}
type Http struct {
// 可以看到Txns是一个广播对象
Txns *util.Broadcast
reqGauge metrics.Gauge
reqMeter metrics.Meter
reqTimer metrics.Timer
}

在前面的代码中,会通过包装本地的conn

localConn := tunnel.Protocol.WrapConn(localConn, param)

func (h *Http) WrapConn(c conn.Conn, ctx interface{}) conn.Conn {
tee := conn.NewTee(c)
// 用一个管道将获取Request和Response串起来
lastTxn := make(chan *HttpTxn)
// 读取Request
go h.readRequests(tee, lastTxn, ctx)
// 读取Response
go h.readResponses(tee, lastTxn)
return tee
}
func (h *Http) readRequests(tee *conn.Tee, lastTxn chan *HttpTxn, connCtx interface{}) {
defer close(lastTxn)

for {
// 不停地从tee的写tee中解析Request
req, err := http.ReadRequest(tee.WriteBuffer())
if err != nil {
// no more requests to be read, we're done
break
}

// golang's ReadRequest/DumpRequestOut is broken. Fix up the request so it works later
req.URL.Scheme = "http"
req.URL.Host = req.Host

// 生成一个HttpTxn对象
txn := &HttpTxn{Start: time.Now(), ConnUserCtx: connCtx}
txn.Req = &HttpRequest{Request: req}
if req.Body != nil {
txn.Req.BodyBytes, txn.Req.Body, err = extractBody(req.Body)
if err != nil {
tee.Warn("Failed to extract request body: %v", err)
}
}

// 发送到Req/Resp共享的channel,通知resp逻辑解析Response
lastTxn <- txn
// 通知view刷新
h.Txns.In() <- txn
}
}
func (h *Http) readResponses(tee *conn.Tee, lastTxn chan *HttpTxn) {
for txn := range lastTxn {
// 当req解析完之后,会触发resp解析
// 从tee的读tee中不停地解析Response
resp, err := http.ReadResponse(tee.ReadBuffer(), txn.Req.Request)
txn.Duration = time.Since(txn.Start)
h.reqTimer.Update(txn.Duration)
if err != nil {
tee.Warn("Error reading response from server: %v", err)
// no more responses to be read, we're done
break
}

// 更新HttpTxn对象的Response
txn.Resp = &HttpResponse{Response: resp}
// apparently, Body can be nil in some cases
if resp.Body != nil {
txn.Resp.BodyBytes, txn.Resp.Body, err = extractBody(resp.Body)
if err != nil {
tee.Warn("Failed to extract response body: %v", err)
}
}

// 通知view刷新
h.Txns.In() <- txn
}
}

Conn.Tee

func NewTee(conn Conn) *Tee {
c := &Tee{
rd: nil,
wr: nil,
Conn: conn,
}

c.readPipe.rd, c.readPipe.wr = io.Pipe()
c.writePipe.rd, c.writePipe.wr = io.Pipe()

// 读取的时候,一并拷贝一份到c.readPipe.wr,那么c.readPipe.rd就可读
// 参考ReadBuffer
c.rd = io.TeeReader(c.Conn, c.readPipe.wr)
// 当写入的时候,一并写入c.writePipe.wr,那么c.writePipe.rz就可读
// 参考WriteBuffer
c.wr = io.MultiWriter(c.Conn, c.writePipe.wr)
return c
}

func (c *Tee) ReadBuffer() *bufio.Reader {
return bufio.NewReader(c.readPipe.rd)
}

func (c *Tee) WriteBuffer() *bufio.Reader {
return bufio.NewReader(c.writePipe.rd)
}

Ngrok服务端源码学习笔记

关于ngrok的使用,参考http://blog.qiujinwu.com/2017/02/13/ngrok-reverse-proxy/

源码地址 https://github.com/inconshreveable/ngrok,我fork一份在https://github.com/qjw/ngrok,代码相对路径src/github.com/qjw/ngrok/src/ngrok/server/

main入口在src/github.com/qjw/ngrok/src/ngrok/main,分成客户端和服务端/ngrokd/ngrokd.go、/ngrok/ngrok.go。

.
├── cli.go 命令行相关
├── control.go 客户端和服务端控制连接逻辑
├── http.go 处理来自公网的http(s)的请求逻辑
├── main.go 入口
├── metrics.go 一些统计相关的东西
├── registry.go 存储全局对象的"容器"
├── tls.go tls加密相关
└── tunnel.go 客户端和服务端隧道逻辑

术语

在了解ngrok服务器原理之前,有几个术语需要区分

  1. controller,控制器,每个客户端对应一个controller,并且会绑定一条tcp连接,默认使用tls加密,代码逻辑control.go,conroller用于传输各种控制指令
  2. tunnel,隧道,一个客户端到服务端有多个隧道,每个隧道有个TYPE,例如http、https、tcp,以及URL,例如test.ngrok.qiujinwu.com【假设ngrok.qiujinwu.com绑定到了服务器】。在服务端代码中,tunnel是个虚拟的实体,并没有绑定的tcp连接。当收到来自公网的请求时,会根据隧道url来匹配客户端
  3. proxy,表示客户端到服务端的数据链路,根据外网请求的多少,客户端到服务端会有多个proxy
  4. listener,tcp服务器,默认会开启(http/https/tunnel三个tcp服务器,tcp类型不明确)

message

在controller中,会发送各种控制指令,这些指令定义在src/github.com/qjw/ngrok/src/ngrok/msg/msg.go,大体而言可以分为三类

  1. 控制指令,用于客户端连接服务器的协商报文
  2. proxy指令,用于服务器请求新的数据链路(由于客户端属于内网,ngrok服务器无法主动建立到客户端的连接,所以服务器会先走控制连接通知客户端,让它发起数据连接请求
  3. 心跳

序列化

序列化比较简单,把报文序列化成字符串,并且把报文名称放在最前面,代码src/github.com/qjw/ngrok/src/ngrok/msg/pack.go,例如

{"Type":"RegProxy","Payload":{"ClientId":"8c57f5cfd5b30dc3215f740f2ad72539"}}

src/github.com/qjw/ngrok/src/ngrok/msg/conn.go有一些从tcp连接(反)序列化的工具函数

全局对象

// GLOBALS
var (
// 所有的tunnel
tunnelRegistry *TunnelRegistry
// 所有的controller
controlRegistry *ControlRegistry

// 参数
opts *Options
// 所有的tcp服务器
listeners map[string]*conn.Listener
)
// ControlRegistry maps a client ID to Control structures
type ControlRegistry struct {
controls map[string]*Control
log.Logger
sync.RWMutex
}
// TunnelRegistry maps a tunnel URL to Tunnel structures
type TunnelRegistry struct {
tunnels map[string]*Tunnel
affinity *cache.LRUCache
log.Logger
sync.RWMutex
}
type Tunnel struct {
// request that opened the tunnel
req *msg.ReqTunnel

// ...
// 关联到control
ctl *Control
// ...
}

连接建立

在服务器接收请求之前,会新建一个listenr,这个对象对tcp服务器进行了封装

Listener

src/github.com/qjw/ngrok/src/ngrok/conn/conn.go

type Listener struct {
net.Addr
// 将请求accept的新tcp连接放入channel
Conns chan *loggedConn
}

type loggedConn struct {
tcp *net.TCPConn
net.Conn
log.Logger
id int32
typ string
}

将net.TCPConn包装成loggedConn,用于区分日志,设置type,id等

func wrapConn(conn net.Conn, typ string) *loggedConn {
switch c := conn.(type) {
case *vhost.HTTPConn:
wrapped := c.Conn.(*loggedConn)
return &loggedConn{wrapped.tcp, conn, wrapped.Logger, wrapped.id, wrapped.typ}
case *loggedConn:
return c
case *net.TCPConn:
wrapped := &loggedConn{c, conn, log.NewPrefixLogger(), rand.Int31(), typ}
wrapped.AddLogPrefix(wrapped.Id())
return wrapped
}

return nil
}

func Listen(addr, typ string, tlsCfg *tls.Config) (l *Listener, err error) {
// listen for incoming connections
listener, err := net.Listen("tcp", addr)
// 。。。
l = &Listener{
Addr: listener.Addr(),
Conns: make(chan *loggedConn),
}

go func() {
for {
rawConn, err := listener.Accept()
c := wrapConn(rawConn, typ)
// 新的连接放入channel
l.Conns <- c
}
}()
return
}

接收请求

func tunnelListener(addr string, tlsConfig *tls.Config) {
// 建立tcp服务器
listener, err := conn.Listen(addr, "tun", tlsConfig)
if err != nil {
panic(err)
}

// 从channel中等待新的请求到来
for c := range listener.Conns {
// 每个连接用新的goroutune
go func(tunnelConn conn.Conn) {
defer func() {
// 自动处理异常
if r := recover(); r != nil {
tunnelConn.Info("tunnelListener failed with error %v: %s", r, debug.Stack())
}
}()

// 读取消息
var rawMsg msg.Message
if rawMsg, err = msg.ReadMsg(tunnelConn); err != nil {
return
}

switch m := rawMsg.(type) {
case *msg.Auth:
// 建立控制连接(controller)
NewControl(tunnelConn, m)

case *msg.RegProxy:
// 建立数据连接
NewProxy(tunnelConn, m)

default:
tunnelConn.Close()
}
}(c)
}
}

一个典型的请求报文

{
"Type": "Auth",
"Payload": {
"Version": "2",
"MmVersion": "1.7",
"User": "",
"Password": "",
"OS": "linux",
"Arch": "amd64",
"ClientId": "8c57f5cfd5b30dc3215f740f2ad72539"
}
}

响应

{
"Type": "AuthResp",
"Payload": {
"Version": "2",
"MmVersion": "1.7",
"ClientId": "8c57f5cfd5b30dc3215f740f2ad72539",
"Error": ""
}
}

Control

func NewControl(ctlConn conn.Conn, authMsg *msg.Auth) {
var err error

// create the object
c := &Control{
auth: authMsg,
conn: ctlConn,
out: make(chan msg.Message),
in: make(chan msg.Message),
proxies: make(chan conn.Conn, 10),
lastPing: time.Now(),
writerShutdown: util.NewShutdown(),
readerShutdown: util.NewShutdown(),
managerShutdown: util.NewShutdown(),
shutdown: util.NewShutdown(),
}

// 设置属性
ctlConn.SetType("ctl")
ctlConn.AddLogPrefix(c.id)

// 版本判断
if authMsg.Version != version.Proto {
failAuth(fmt.Errorf("Incompatible versions. Server %s, client %s. Download a new version at http://ngrok.com", version.MajorMinor(), authMsg.Version))
return
}

// 新增/更新control到全局Registry
if replaced := controlRegistry.(c.id, c); replaced != nil {
// 等待旧的完全关闭
replaced.shutdown.WaitComplete()
}

// 新的goruntune监听写(需要最先开启,以便回复连接请求报文)
go c.writer()

// 回复连接建立确认报文
c.out <- &msg.AuthResp{
Version: version.Proto,
MmVersion: version.MajorMinor(),
ClientId: c.id,
}

// 预先申请一个proxy连接
c.out <- &msg.ReqProxy{}

// 一堆其他的后台goroutune
go c.manager()
go c.reader()
go c.stopper()
}
// 发送控制消息
func (c *Control) writer() {
for m := range c.out {
if err := msg.WriteMsg(c.conn, m); err != nil {
panic(err)
}
}
}
// 接收控制消息
func (c *Control) reader() {
for {
if msg, err := msg.ReadMsg(c.conn); err != nil {
if err == io.EOF {
c.conn.Info("EOF")
return
}
} else {
// 推送到c.in channel中
c.in <- msg
}
}
}
func (c *Control) manager() {
// 检查control超时
reap := time.NewTicker(connReapInterval)

for {
select {
case <-reap.C:
// 检查是否超时
if time.Since(c.lastPing) > pingTimeoutInterval {
c.conn.Info("Lost heartbeat")
c.shutdown.Begin()
}
case mRaw, ok := <-c.in:
// 是否有新的消息
if !ok {
return
}

switch m := mRaw.(type) {
case *msg.ReqTunnel:
// 客户端注册一个新的tunnel
c.registerTunnel(m)

case *msg.Ping:
// 回复心跳
c.lastPing = time.Now()
c.out <- &msg.Pong{}
}
}
}
}

注册controller

func (r *ControlRegistry) Add(clientId string, ctl *Control) (oldCtl *Control) {
r.Lock()
defer r.Unlock()

oldCtl = r.controls[clientId]
if oldCtl != nil {
oldCtl.Replaced(ctl)
}

r.controls[clientId] = ctl
r.Info("Registered control with id %s", clientId)
return
}

退出流程

退出流程可以考虑用https://golang.org/pkg/context/简化下,这里用到了一个util.Shutdown的工具库(src/github.com/qjw/ngrok/src/ngrok/util/shutdown.go)。

type Control struct {
// synchronizer for controlled shutdown of writer()
writerShutdown *util.Shutdown

// synchronizer for controlled shutdown of reader()
readerShutdown *util.Shutdown

// synchronizer for controlled shutdown of manager()
managerShutdown *util.Shutdown

// synchronizer for controller shutdown of entire Control
shutdown *util.Shutdown
}


func (c *Control) reader() {
// kill everything if the reader stops
defer c.shutdown.Begin()
// notify that we're done
defer c.readerShutdown.Complete()
}
func (c *Control) writer() {
// kill everything if the writer() stops
defer c.shutdown.Begin()

// notify that we've flushed all messages
defer c.writerShutdown.Complete()
}
func (c *Control) manager() {
// kill everything if the control manager stops
defer c.shutdown.Begin()

// notify that manager() has shutdown
defer c.managerShutdown.Complete()
}

有个专门的goruntune来监听退出

func (c *Control) stopper() {
// 等待
c.shutdown.WaitBegin()

// 注销controller
controlRegistry.Del(c.id)

// 等待各种子goruntune注销
// close会触发其他的goruntune退出
close(c.in)
c.managerShutdown.WaitComplete()

// shutdown writer()
close(c.out)
c.writerShutdown.WaitComplete()

// 关闭空置连接
c.conn.Close()

// 关闭各种tunnel
for _, t := range c.tunnels {
// 调用shutdown,
t.Shutdown()
}

// 关闭各种proxy连接
close(c.proxies)
for p := range c.proxies {
p.Close()
}

// 最终关闭
c.shutdown.Complete()
c.conn.Info("Shutdown complete")
}

Tunnel注册

// Register a new tunnel on this control connection
func (c *Control) registerTunnel(rawTunnelReq *msg.ReqTunnel) {
// 若有多个tunnel,可以用一个报文一次性注册
for _, proto := range strings.Split(rawTunnelReq.Protocol, "+") {
tunnelReq := *rawTunnelReq
tunnelReq.Protocol = proto

c.conn.Debug("Registering new tunnel")
t, err := NewTunnel(&tunnelReq, c)
if err != nil {
// 回复注册失败确认
c.out <- &msg.NewTunnel{Error: err.Error()}
return
}

// 注册到controller
c.tunnels = append(c.tunnels, t)

// 回复注册成功确认
c.out <- &msg.NewTunnel{
Url: t.url,
Protocol: proto,
ReqId: rawTunnelReq.ReqId,
}

rawTunnelReq.Hostname = strings.Replace(t.url, proto+"://", "", 1)
}
}

一个典型的注册报文

{
"Type": "ReqTunnel",
"Payload": {
"ReqId": "dd1819bd088d7675",
"Protocol": "http",
"Hostname": "",
"Subdomain": "qjw",
"HttpAuth": "",
"RemotePort": 0
}
}

响应

{
"Type": "NewTunnel",
"Payload": {
"ReqId": "dd1819bd088d7675",
"Url": "http://qjw.ngrok.com:10080",
"Protocol": "http",
"Error": ""
}
}

// Create a new tunnel from a registration message received
// on a control channel
func NewTunnel(m *msg.ReqTunnel, ctl *Control) (t *Tunnel, err error) {
t = &Tunnel{
req: m,
start: time.Now(),
ctl: ctl,
Logger: log.NewPrefixLogger(),
}

proto := t.req.Protocol
switch proto {
case "tcp":
// ...
return

case "http", "https":
l, ok := listeners[proto]
// 注册vhost,之所以v,是因为多个url共享同一个端口,类似于nginx的server
if err = registerVhost(t, proto, l.Addr.(*net.TCPAddr).Port); err != nil {
return
}

default:
}
t.AddLogPrefix(t.Id())
return
}

这里主要确认vhost的参数,最重要的host,port,这对于公网连接的请求路由至关重要

var defaultPortMap = map[string]int{
"http": 80,
"https": 443,
"smtp": 25,
}

// Common functionality for registering virtually hosted protocols
func registerVhost(t *Tunnel, protocol string, servingPort int) (err error) {
vhost := os.Getenv("VHOST")
if vhost == "" {
vhost = fmt.Sprintf("%s:%d", opts.domain, servingPort)
}

// Canonicalize virtual host by removing default port (e.g. :80 on HTTP)
defaultPort, ok := defaultPortMap[protocol]
if !ok {
return fmt.Errorf("Couldn't find default port for protocol %s", protocol)
}

// 移除默认的端口(比如80可以忽略,81就必须明确地出现在连接中)
defaultPortSuffix := fmt.Sprintf(":%d", defaultPort)
if strings.HasSuffix(vhost, defaultPortSuffix) {
vhost = vhost[0 : len(vhost)-len(defaultPortSuffix)]
}

// Canonicalize by always using lower-case
vhost = strings.ToLower(vhost)

// 从请求中获取host
hostname := strings.ToLower(strings.TrimSpace(t.req.Hostname))
if hostname != "" {
t.url = fmt.Sprintf("%s://%s", protocol, hostname)
// 注册tunnel
return tunnelRegistry.Register(t.url, t)
}

// 未指定host,指定了subdomain,就通过服务器启动参数domain来拼装host
subdomain := strings.ToLower(strings.TrimSpace(t.req.Subdomain))
if subdomain != "" {
t.url = fmt.Sprintf("%s://%s.%s", protocol, subdomain, vhost)
// 注册tunnel
return tunnelRegistry.Register(t.url, t)
}

// 随机生成一个url
t.url, err = tunnelRegistry.RegisterRepeat(func() string {
return fmt.Sprintf("%s://%x.%s", protocol, rand.Int31(), vhost)
}, t)

return
}

func (t *Tunnel) Shutdown() {
// 取消注册
tunnelRegistry.Del(t.url)
}

Proxy

在连接建立之后,服务器就会预先请求一个proxy

{"Type":"ReqProxy","Payload":{}}

这个clientid非常重要,用于关联到controller

{"Type":"RegProxy","Payload":{"ClientId":"8c57f5cfd5b30dc3215f740f2ad72539"}}

func NewControl(ctlConn conn.Conn, authMsg *msg.Auth) {
// 预先申请一个proxy
c.out <- &msg.ReqProxy{}
}
func tunnelListener(addr string, tlsConfig *tls.Config) {
for c := range listener.Conns {
go func(tunnelConn conn.Conn) {
switch m := rawMsg.(type) {
case *msg.RegProxy:
// 新增一个proxy
NewProxy(tunnelConn, m)
}
}(c)
}
}

proxy并没有全局的对象来注册,而是简单地关联到controller(通过之前的clientid),具体就是通过一个带缓存的channel

type Control struct {
// proxy connections
proxies chan conn.Conn
}

func NewControl(ctlConn conn.Conn, authMsg *msg.Auth) {
c := &Control{
// 10个元素的channel
proxies: make(chan conn.Conn, 10),
}
}

func NewProxy(pxyConn conn.Conn, regPxy *msg.RegProxy) {
pxyConn.SetType("pxy")

// 查询controller
ctl := controlRegistry.Get(regPxy.ClientId)

// 注册
ctl.RegisterProxy(pxyConn)
}

当需要获取proxy时,就直接调用下面的函数

func (c *Control) GetProxy() (proxyConn conn.Conn, err error) {
var ok bool

// get a proxy connection from the pool
select {
// 直接从channel中获取
case proxyConn, ok = <-c.proxies:
if !ok {
err = fmt.Errorf("No proxy connections available, control is closing")
return
}
default:
// 没有的话,立即请求客户端
if err = util.PanicToError(func() { c.out <- &msg.ReqProxy{} }); err != nil {
return
}

// 继续从channle中获取
select {
case proxyConn, ok = <-c.proxies:
if !ok {
err = fmt.Errorf("No proxy connections available, control is closing")
return
}

case <-time.After(pingTimeoutInterval):
err = fmt.Errorf("Timeout trying to get proxy connection")
return
}
}
return
}

处理公网请求

func Main() {
// listen for http
if opts.httpAddr != "" {
listeners["http"] = startHttpListener(opts.httpAddr, nil)
}
}
// Listens for new http(s) connections from the public internet
func startHttpListener(addr string, tlsCfg *tls.Config) (listener *conn.Listener) {
// 创建服务器
var err error
if listener, err = conn.Listen(addr, "pub", tlsCfg); err != nil {
panic(err)
}

proto := "http"
if tlsCfg != nil {
proto = "https"
}

log.Info("Listening for public %s connections on %v", proto, listener.Addr.String())
go func() {
for conn := range listener.Conns {
// 每个连接都会调用下面的goruntune
go httpHandler(conn, proto)
}
}()

return
}
func httpHandler(c conn.Conn, proto string) {

// 获取Http头
vhostConn, err := vhost.HTTP(c)
if err != nil {
c.Warn("Failed to read valid %s request: %v", proto, err)
c.Write([]byte(BadRequest))
return
}

// 获取http参数
host := strings.ToLower(vhostConn.Host())
auth := vhostConn.Request.Header.Get("Authorization")

// done reading mux data, free up the request memory
vhostConn.Free()

// We need to read from the vhost conn now since it mucked around reading the stream
c = conn.Wrap(vhostConn, "pub")

// 从全局的Registry中查找tunnel
tunnel := tunnelRegistry.Get(fmt.Sprintf("%s://%s", proto, host))
if tunnel == nil {
c.Info("No tunnel found for hostname %s", host)
c.Write([]byte(fmt.Sprintf(NotFound, len(host)+18, host)))
return
}

// 检查认证
if tunnel.req.HttpAuth != "" && auth != tunnel.req.HttpAuth {
c.Info("Authentication failed: %s", auth)
c.Write([]byte(NotAuthorized))
return
}

// 数据交换
tunnel.HandlePublicConnection(c)
}

Http头

在数据交换过程中,为了作请求路由,必须先从路由中解析Http头(不考虑tcp tunnel),然后根据http头来作数据路由,而读出来的头,也必须原原本本的下发到客户端。

const (
initVhostBufSize = 1024 // allocate 1 KB up front to try to avoid resizing
)

type sharedConn struct {
sync.Mutex
net.Conn // the raw connection
vhostBuf *bytes.Buffer // all of the initial data that has to be read in order to vhost a connection is saved here
}

func newShared(conn net.Conn) (*sharedConn, io.Reader) {
c := &sharedConn{
Conn: conn,
// 分配一块内存,用于存储http头,以便原原本本的下发到客户端
vhostBuf: bytes.NewBuffer(make([]byte, 0, initVhostBufSize)),
}

// 当从conn读取数据后,复制一份到vhostBuf
return c, io.TeeReader(conn, c.vhostBuf)
}

func (c *sharedConn) Read(p []byte) (n int, err error) {
c.Lock()
// 已经读取到内存中的数据已经发送完
if c.vhostBuf == nil {
c.Unlock()
return c.Conn.Read(p)
}

// 优先从buf中读取
n, err = c.vhostBuf.Read(p)

// end of the request buffer
if err == io.EOF {
// let the request buffer get garbage collected
// and make sure we don't read from it again
c.vhostBuf = nil

// 读了一半继续从con中读取
// continue reading from the connection
var n2 int
n2, err = c.Conn.Read(p[n:])

// update total read
n += n2
}
c.Unlock()
return
}

vhost.HTTP

func HTTP(conn net.Conn) (httpConn *HTTPConn, err error) {
// 创建一个tee的conn
c, rd := newShared(conn)

// 解析Http头,不得不说go的系统库很牛X
httpConn = &HTTPConn{sharedConn: c}
if httpConn.Request, err = http.ReadRequest(bufio.NewReader(rd)); err != nil {
return
}

// body不需要
httpConn.Request.Body.Close()
return
}

由于有了sharedConn这一层封装,在数据交换时,完全可以不考虑,一部分数据已经为了解析http头而实现读取出来过的细节

数据交换

func (t *Tunnel) HandlePublicConnection(publicConn conn.Conn) {
for i := 0; i < (2 * proxyMaxPoolSize); i++ {
// 获取一个proxy
if proxyConn, err = t.ctl.GetProxy(); err != nil {
t.Warn("Failed to get proxy connection: %v", err)
return
}
// 自动关闭和设置属性
defer proxyConn.Close()
t.Info("Got proxy connection %s", proxyConn.Id())
proxyConn.AddLogPrefix(t.Id())

// 发送数据,提示开始传输数据
startPxyMsg := &msg.StartProxy{
Url: t.url,
ClientAddr: publicConn.RemoteAddr().String(),
}
if err = msg.WriteMsg(proxyConn, startPxyMsg); err != nil {
proxyConn.Warn("Failed to write StartProxyMessage: %v, attempt %d", err, i)
proxyConn.Close()
} else {
// success
break
}
}

// 立即申请一个新的proxy
util.PanicToError(func() { t.ctl.out <- &msg.ReqProxy{} })

// 交换数据
bytesIn, bytesOut := conn.Join(publicConn, proxyConn)
}

通过一个join实现双向的数据交换,这个地方可以做一些部分优化,直接从内核到内核传数据,参考sendfilesplice

func Join(c Conn, c2 Conn) (int64, int64) {
var wait sync.WaitGroup

pipe := func(to Conn, from Conn, bytesCopied *int64) {
defer to.Close()
defer from.Close()
defer wait.Done()

var err error
// 这个地方可以优化,在内部有一个应用层的buf作中转
*bytesCopied, err = io.Copy(to, from)
if err != nil {
from.Warn("Copied %d bytes to %s before failing with error %v", *bytesCopied, to.Id(), err)
} else {
from.Debug("Copied %d bytes to %s", *bytesCopied, to.Id())
}
}

wait.Add(2)
var fromBytes, toBytes int64
// 开启两个goruntune来实现双向交换
go pipe(c, c2, &fromBytes)
go pipe(c2, c, &toBytes)
c.Info("Joined with connection %s", c2.Id())
wait.Wait()
return fromBytes, toBytes
}

tls加密

对于tls支持,go系统库支持的非常好,参考http://colobu.com/2016/06/07/simple-golang-tls-examples/

ngrok有点讨巧,用系统库的工具函数在tcp裸连接做了一层包装

func Listen(addr, typ string, tlsCfg *tls.Config) (l *Listener, err error) {
// 监听tcp端口
listener, err := net.Listen("tcp", addr)

// 声明对象
l = &Listener{
Addr: listener.Addr(),
Conns: make(chan *loggedConn),
}

go func() {
for {
rawConn, err := listener.Accept()

// 处理新连接
c := wrapConn(rawConn, typ)
// 若指定了tls配置(https tunnel必需)
if tlsCfg != nil {
// 直接将原来的裸tcp conn替换成被tls包装过的conn,所有的read/write会被tls层先做加工再下发
c.Conn = tls.Server(c.Conn, tlsCfg)
}
l.Conns <- c
}
}()
return
}

Client

在ngrok客户端也是同样的逻辑

func Dial(addr, typ string, tlsCfg *tls.Config) (conn *loggedConn, err error) {
var rawConn net.Conn
if rawConn, err = net.Dial("tcp", addr); err != nil {
return
}

conn = wrapConn(rawConn, typ)
conn.Debug("New connection to: %v", rawConn.RemoteAddr())

// 若指定了tls配置(https tunnel必需)
if tlsCfg != nil {
conn.StartTLS(tlsCfg)
}
return
}
func (c *loggedConn) StartTLS(tlsCfg *tls.Config) {
// 直接将原来的裸tcp conn替换成被tls包装过的conn,所有的read/write会被tls层先做加工再下发
c.Conn = tls.Client(c.Conn, tlsCfg)
}

Golang 匿名对象指针和对象的区别

都说一图胜千言,有代码就不废话了

package main

import (
"fmt"
)

type Animal struct {
Name string
}
type Persion struct {
Animal
}
type Ppersion struct {
*Animal
}

func main() {
animal := Animal{Name: "Cat"}
persion := Persion{animal}
ppersion := Ppersion{&animal}
fmt.Println("Animal:" + animal.Name)
fmt.Println("Persion:" + persion.Name)
fmt.Println("PPersion:" + ppersion.Name)

animal.Name = "Dog"
fmt.Println("------------我是卖萌分割线------------")
fmt.Println("Animal:" + animal.Name)
fmt.Println("Persion:" + persion.Name)
fmt.Println(persion.Animal == animal)
fmt.Println("PPersion:" + ppersion.Name)
fmt.Println(ppersion.Animal == &animal)
}
Animal:Cat
Persion:Cat
PPersion:Cat
------------我是卖萌分割线------------
Animal:Dog
Persion:Cat
false
PPersion:Dog
true

struct/interface转换

下面的代码会报错,因为Stduent未实现People的接口,实现People接口的是*People,所以改法有两种

  1. var peo People = &Stduent{} # 用指针赋值给People
  2. func (stu Stduent) Speak(think string) (talk string) # Stduent对象实现Speak方法
package main

import (
"fmt"
)

type People interface {
Speak(string) string
}

type Stduent struct{}

func (stu *Stduent) Speak(think string) (talk string) {
if think == "bitch" {
talk = "You are a good boy"
} else {
talk = "hi"
}
return
}

func main() {
var peo People = Stduent{}
think := "bitch"
fmt.Println(peo.Speak(think))
}

继承和组合

下面的代码调用的是People.ShowB,因为go没有类似于C++的多态机制

Go中没有继承! 没有继承!没有继承!是叫组合!组合!组合!
这里People是匿名组合People。被组合的类型People所包含的方法虽然升级成了外部类型Teacher这个组合类型的方法,但他们的方法(ShowA())调用时接受者并没有发生变化。
这里仍然是People。毕竟这个People类型并不知道自己会被什么类型组合,当然也就无法调用方法时去使用未知的组合者Teacher类型的功能。
因此这里执行t.ShowA()时,在执行ShowB()时该函数的接受者是People,而非Teacher。具体参见官方文档

type People struct{}

func (p *People) ShowA() {
fmt.Println("showA")
p.ShowB()
}
func (p *People) ShowB() {
fmt.Println("showB")
}

type Teacher struct {
People
}

func (t *Teacher) ShowB() {
fmt.Println("teacher showB")
}

func main() {
t := Teacher{}
t.ShowA()
}

参考

  1. https://segmentfault.com/q/1010000002687684
  2. https://yushuangqi.com/blog/2017/golang-mian-shi-ti-da-an-yujie-xi.html

Golang Http调试

Go Http包的处理函数如下,一个用于输入(Request),一个用于输出(Response),很多第三方的web框架都提供了自己的处理函数定义,但都很方便地适配HandlerFunc

type HandlerFunc func(ResponseWriter, *Request)

为了调试,http包提供了ResponseWriter/Request的模拟

httptest

  1. httptest.NewRequest 用于模拟一个Request请求
  2. httptest.NewRecorder用于模拟一个Response
package main

import (
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
)

func main() {
handler := func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "<html><body>Hello World![" + r.URL.String() + "]</body></html>")
}

req := httptest.NewRequest("GET", "http://example.com/foo", nil)
w := httptest.NewRecorder()
handler(w, req)

resp := w.Result()
body, _ := ioutil.ReadAll(resp.Body)

fmt.Println(resp.StatusCode)
fmt.Println(resp.Header.Get("Content-Type"))
fmt.Println(string(body))

// Output:
// 200
// text/html; charset=utf-8
// <html><body>Hello World![http://example.com/foo]</body></html>
}

Server

处理模拟请求和响应,Httptest同样提供了服务器的模拟,不过非常简单,没有路由,只能提供单个响应函数(在响应函数自行处理另当别论

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"log"
)

func main() {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, client")
}))
defer ts.Close()

res, err := http.Get(ts.URL)
if err != nil {
log.Fatal(err)
}
greeting, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
log.Fatal(err)
}

fmt.Printf("%s", greeting)
// Output: Hello, client
}

server提供了几个接口,用法有一些简单的区别

// NewServer starts and returns a new Server.
// The caller should call Close when finished, to shut it down.
func NewServer(handler http.Handler) *Server

// NewUnstartedServer returns a new Server but doesn't start it.
//
// After changing its configuration, the caller should call Start or
// StartTLS.
//
// The caller should call Close when finished, to shut it down.
func NewUnstartedServer(handler http.Handler) *Server

// NewTLSServer starts and returns a new Server using TLS.
// The caller should call Close when finished, to shut it down.
func NewTLSServer(handler http.Handler) *Server

启动服务器,并不会阻塞当前gorounte,在内部会开启一个新的gorounte来执行监听任务

func (s *Server) goServe() {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.Config.Serve(s.Listener)
}()
}

httptrace

此外,http还包含httptrace,用于监听http请求的各种事件,核心就是一个ClientTrace对象,具体的函数,可以godoc

package main

import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/http/httptrace"
"os"
)

func main() {
server := httptest.NewServer(http.HandlerFunc(http.NotFound))
defer server.Close()
c := http.Client{}
req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
panic(err)
}

trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
fmt.Println("Got Conn")
},
ConnectStart: func(network, addr string) {
fmt.Println("Dial start")
},
ConnectDone: func(network, addr string, err error) {
fmt.Println("Dial done")
},
GotFirstResponseByte: func() {
fmt.Println("First response byte!")
},
WroteHeaders: func() {
fmt.Println("Wrote headers")
},
WroteRequest: func(wr httptrace.WroteRequestInfo) {
fmt.Println("Wrote request", wr)
},
}
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
fmt.Println("Starting request!")
resp, err := c.Do(req)
if err != nil {
panic(err)
}
io.Copy(os.Stdout, resp.Body)
fmt.Println("Done!")
}

参考

  1. https://golang.org/src/net/http/httptest/example_test.go
  2. https://golang.org/pkg/net/http/httptrace/
  3. http://www.tuicool.com/articles/nQzqmuZ

Golang Context

从go1.7开始,golang.org/x/net/context包正式作为context包进入了标准库。官方的说明如下

Package context defines the Context type, which carries deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.

参考

  1. https://blog.golang.org/context
  2. https://godoc.org/golang.org/x/net/context

Context接口非常简单

// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// 是否有超时,超时context才有
Deadline() (deadline time.Time, ok bool)
// 是否已经结束,通过从channel中读取数据判断,用于被监听的routine
Done() <-chan struct{}
// 当Done返回的ch读取数据(或者被close),可以获取到错误(取消,超时等)
Err() error
// value Context的值,通过key获取
Value(key interface{}) interface{}
}

Context的核心通过channel来作同步,为了理解,最好了解go的同步机制

Go语言并发模型

参考Go Concurrency Patterns: Pipelines and cancellation

和其他语言相比,go对并发的控制源于非常简单,但功能并不弱,常用的包括(但不限于)

  1. sync.Mutex 互斥
  2. sync.WaitGroup
  3. channel

官方推荐channel,并且对此有很好的优化

pipeline

连续单个的管道

package main

import "fmt"

func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

func main() {
// 设置流水线
c := gen(2, 3)
out := sq(c)

// 消费输出结果
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}

多个并行的pipe并行处理,并汇聚到下一个pipe,核心是sync.WaitGroup作屏障

package main

import (
"fmt"
"sync"
)

func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// 为每一个输入channel cs 创建一个 goroutine output
// output 将数据从 c 拷贝到 out,直到 c 关闭,然后 调用 wg.Done
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// 启动一个 goroutine,用于所有 output goroutine结束时,关闭 out
// 该goroutine 必须在 wg.Add 之后启动
go func() {
wg.Wait()
close(out)
}()
return out
}

func main() {
in := gen(2, 3)

// 启动两个 sq 实例,即两个goroutines处理 channel "in" 的数据
c1 := sq(in)
c2 := sq(in)

// merge 函数将 channel c1 和 c2 合并到一起,这段代码会消费 merge 的结果
for n := range merge(c1, c2) {
fmt.Println(n) // 打印 4 9, 或 9 4
}
}

处理中断

通过传递一个用于控制是否退出的channel来实现和谐的中断,这也是Context的核心所在,为了控制多个channel,只需要close channel写端即可唤醒所有的channel读端

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// 为 cs 的每一个 channel 创建一个 goroutine
// 这个 goroutine 运行 output,它将数据从 c
// 拷贝到 out,直到 c 关闭,或者 接收到 done
// 的关闭信号。人啊后调用 wg.Done()
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}

关于中断多个channel,参考https://chilts.org/2017/06/12/cancelling-multiple-goroutines

channel作为一种同步机制,有必要了解go的内存模型和(非)缓存的同步差异

go内存模型

参考https://golang.org/ref/mem

Happens Before/After

Happens-before用来指明Go程序里的内存操作的局部顺序。如果一个内存操作事件e1 happens-before e2,则e2 happens-after e1也成立;如果e1不是happens-before e2,也不是happens-after e2,则e1和e2是并发的。

下面这段是核心,也很费解

在这个定义之下,如果以下情况满足,则对变量(v)的内存写操作(w)对一个内存读操作(r)来说允许可见的:

  1. r不在w开始之前发生(可以是之后或并发);
  2. w和r之间没有另一个写操作(w’)发生;

为了保证对变量(v)的一个特定写操作(w)对一个读操作(r)可见,就需要确保w是r唯一允许的写操作,于是如果以下情况满足,则对变量(v)的内存写操作(w)对一个内存读操作(r)来说保证可见的:

  1. w在r开始之前发生;
  2. 所有其它对v的写操作只在w之前或r之后发生;

简单地说,对于【允许可见】表示有可能读到正确的写数据,但是由于存在读脏数据的可能,(假定读操作是有意义的)所以极端情况,编译器甚至优化掉写代码逻辑

而【保证可见】就确保一定能读到正确的写数据,所以编译器会确保写操作在读操作前执行

Channel 同步

  1. 对一个Channel的发送操作(send) happens-before 相应Channel的接收操作完成
  2. 关闭一个Channel happens-before 从该Channel接收到最后的返回值0
  3. 不带缓冲的Channel的接收操作(receive) happens-before 相应Channel的发送操作完成
package main

var c = make(chan int, 10)
var a string

func f() {
a = "hello, world"
c <- 0
}
func main() {
go f()
<-c
print(a)
}

上述代码可以确保输出hello, world,因为【a = “hello, world”】 happens-before 【c <- 0】,【print(a)】 happens-after 【<-c】, 根据上面的规则1)以及happens-before的可传递性,a = “hello, world” happens-beforeprint(a)。

package main

var c = make(chan int, 10)
var a string

func f() {
a = "hello, world"
close(c) ///////
}
func main() {
go f()
<-c
print(a)
}

根据规则2)把c<-0替换成close(c)也能保证输出hello,world,因为关闭操作在<-c接收到0之前发送。

package main

var c = make(chan int)
var a string

func f() {
a = "hello, world"
<-c
}
func main() {
go f()
c <- 0
print(a)
}

根据规则3),因为c是不带缓冲的【Channel,a = “hello, world”】 happens-before 【<-c】 happens-before 【<- 0】 happens-before 【print(a)】,

但如果c是缓冲队列,如定义c = make(chan int, 1), 那结果就不确定了。

Context 解决的问题

在并发模型中,经常会开启几个go routine来并发处理任务,这就提出了能够中断(取消)的需求,虽然前面提到的方案都是可行的(实际上Context也就是对上述方案的封装),但抽象得并不好,

还有一个很大的问题是,如果调用的go routine内部又开启了多个go routine,如何和谐的递归触发中断呢?

除了中断,另外一个常见的需求是超时,以及传递参数

go routine可以直接传参数,Context valueCtx的优势在于,父routine的value可以继承到子routine(若key不冲突),只需要(以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。)

为此,Context提供了几个工具函数

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

  1. WithTimeout对WithDeadline的一个简单转换封装,就是相对时间/绝对实际的差别,它继承自CancelCtx
  2. CancelCtx是可以取消Context,另一个参数是一个函数,调用函数实际上就是关闭(close)Done的channel。
  3. Context提供了空Context(emptyCtx),无实际意义,除了做了根Context
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}

var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

使用

参考http://www.flysnow.org/2017/05/12/go-in-action-go-context.html

package main

import (
"fmt"
"time"
"context"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("监控退出,停止了...")
return
default:
fmt.Println("goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}(ctx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}

Context控制多个goroutine

package main

import (
"fmt"
"time"
"context"
)

func watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println(name,"监控退出,停止了...")
return
default:
fmt.Println(name,"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
go watch(ctx,"【监控1】")
go watch(ctx,"【监控2】")
go watch(ctx,"【监控3】")
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}

CancelCtx

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context

done chan struct{} // closed by the first cancel call.

mu sync.Mutex
// 管理routine树状结构(bool无意义,只因无类似stl set的数据结构)
children map[canceler]bool // set to nil by the first cancel call
// 退出后,标记原因的err
err error // set to non-nil by the first cancel call
}

func (c *cancelCtx) Done() <-chan struct{} {
// 用于子Routine监听退出的channel
return c.done
}

func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}

chilren存的是一个canceler接口

// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
// cancel确保可以递归的cancel子routine
cancel(removeFromParent bool, err error)
// 参见propagateCancel
Done() <-chan struct{}
}

关闭接口

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
// 返回的第二个参数是调用了cancel函数的函数
return &c, func() { c.cancel(true, Canceled) }
}

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
// 设置了err就表示已经cancel过,不能重复cancel
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
// 关闭channel,唤醒所有监听的子routine
close(c.done)
// 递归cancel子 routine
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()

// 这里作为一个bool参数,是因为并非每种ctx都需要removeChild。比如valueCtx就不需要,它只能寄居在另外两种里面
if removeFromParent {
removeChild(c.Context, c)
}
}

正因为如此,所以提供了工具函数,不停地网上遍历,找到最近的一个具有chilren的context

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx: // 人家继承自cancelCtx
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}

从父context中删除就使用了这个工具函数,也就是说他并不是他parent的child

// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
// 这里的p并不一定是他的直接parent
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}

构建树

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
// parent无可退出,那自认也就不能包含什么chilren
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent已经cancel,立即cancel child
// parent has already been canceled
child.cancel(false, p.err)
} else {
// 构建context树
if p.children == nil {
p.children = make(map[canceler]bool)
}
p.children[child] = true
}
p.mu.Unlock()
} else {
// 按理不会走到这里来,既然parent.Done()不为空,那么肯定实现了cancel/timerCtx
// 然后到为了确保自行扩展导致的异常情况?????
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

timerCtx

func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
// 如果当前的过期时间比parent还后,那么就不需要过期了,简单地做个cancelCtx就行了
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := deadline.Sub(time.Now())
if d <= 0 {
// 立即执行
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// 回调执行
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

valueCtx


// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}

func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
// 往上递归,实现累加传递参数
return c.Context.Value(key)
}

Golang文档

Godoc

Godoc 的概念同 Python 的 Docstring 和 Java 的 Javadoc类似,但是设计上更为简单。

如果需要将注释转化成HTML形式的文档,Godoc用户还需要掌握一些额外的格式化规则:

  1. 段落以空行格开
  2. 预格式化的文档应该缩进(参考gob包的doc.go
  3. URL将被转化为HTML链接,无需其它的特殊标记

go doc

go doc命令可以打印附于Go语言程序实体上的文档。我们可以通过把程序实体的标识符作为该命令的参数来达到查看其文档的目的。

king@king:~/code/go/src/test$ godoc fmt Printf
use 'godoc cmd/fmt' for documentation on the fmt command

func Printf(format string, a ...interface{}) (n int, err error)
Printf formats according to a format specifier and writes to standard
output. It returns the number of bytes written and any write error
encountered.
king@king:~/code/go/src/test$ go doc fmt Printf
func Printf(format string, a ...interface{}) (n int, err error)
Printf formats according to a format specifier and writes to standard
output. It returns the number of bytes written and any write error
encountered.

也可以处理当前目录下的模块

king@king:~/code/go/src/test$ go doc Add2
func Add2(a, b int) int
一个加法实现 返回a+b的值

king@king:~/code/go/src/test$ go doc bb
package bb // import "test/bb"

提供的常用库,有一些常用的方法,方便使用

提供的常用库,有一些常用的方法,方便使用1222

func Add(a, b int) int
func Add2(a, b int) int

参数说明

标记名称 标记描述
-c 加入此标记后会使go doc命令区分参数中字母的大小写。默认情况下,命令是大小写不敏感的。
-cmd 加入此标记后会使go doc命令同时打印出main包中的可导出的程序实体(其名称的首字母大写)的文档。默认情况下,这部分文档是不会被打印出来的。
-u 加入此标记后会使go doc命令同时打印出不可导出的程序实体(其名称的首字母小写)的文档。默认情况下,这部分文档是不会被打印出来的。

godoc

king@king:~/code/go/src/test$ godoc fmt
use 'godoc cmd/fmt' for documentation on the fmt command

PACKAGE DOCUMENTATION

package fmt
import "fmt"

Package fmt implements formatted I/O with functions analogous to C's
printf and scanf. The format 'verbs' are derived from C's but are
simpler.
king@king:~/code/go/src/test$ godoc fmt Printf
use 'godoc cmd/fmt' for documentation on the fmt command

func Printf(format string, a ...interface{}) (n int, err error)
Printf formats according to a format specifier and writes to standard
output. It returns the number of bytes written and any write error
encountered.
king@king:~/code/go/src/test$ godoc fmt Printf Println
use 'godoc cmd/fmt' for documentation on the fmt command

func Printf(format string, a ...interface{}) (n int, err error)
Printf formats according to a format specifier and writes to standard
output. It returns the number of bytes written and any write error
encountered.

func Println(a ...interface{}) (n int, err error)
Println formats using the default formats for its operands and writes to
standard output. Spaces are always added between operands and a newline
is appended. It returns the number of bytes written and any write error
encountered.

最牛叉的是http命令,可以直接打开浏览器看文档和代码,生成的文档包含本机GOPATH下的所有代码

godoc -http=:6060

swagger

对于Resful Api,用swagger ui可以很好的组织和测试接口,请参考项目https://github.com/qjw/go-swagger-doc

Doxygen

Doxygen是一种开源跨平台的,以类似JavaDoc风格描述的文档系统,完全支持C、C++、Java、Objective-C和IDL语言,部分支持PHP、C#。注释的语法与Qt-Doc、KDoc和JavaDoc兼容。

参考旧文http://qjw.qiujinwu.com/blog/2013/06/02/doxygen_study

参考

  1. http://octman.com/blog/2014-02-24-godoc-documenting-go-code/
  2. http://wiki.jikexueyuan.com/project/go-command-tutorial/0.5.html
  3. https://mikespook.com/2011/04/%E3%80%90%E7%BF%BB%E8%AF%91%E3%80%91godoc%EF%BC%9A%E6%96%87%E6%A1%A3%E5%8C%96-go-%E4%BB%A3%E7%A0%81/