一 说明
K8S服务编排工具,在各个公司基本都会用到,而为了部署Emqx更方便快捷,也更便于统一管理,所以我们的Emqx集群直接通过K8S部署。
当然,并非直接把Emqx直接部署到用户跑微服务的worker节点上。k8s的worker节点,我们一般都会打一些标签,用于区分用来部署什么。而针对emqx,我们增加了三台4核8G的机器作为worker节点,打上了zone=emqx的标签,专用于emqx。
二 Emqx的部署
部署Emqx之前,先确保k8s的master节点安装了helm工具,因为使用helm部署更加方便快捷,而且emqx的开源项目在github在有维护一套helm的charts,便于直接拿来修改部署。安装可以参考文章:K8S的helm包管理器工具部署
安装好Helm之后,可以到github上找到Emqx开源项目,copydeploy/charts/emqx
目录到k8s的master的上,调整配置之后即可执行命令 helm install emqx .
启动emqx集群,但是这里的chart并没有支持ssl,于是,我在原有chart的基础上,增加了ssl的支持。项目维护在gitee:emqx-chart。
其中的SSL证书,在values.yaml中修改,sslCacertPem
为自签CA证书,sslKeyPem
为自签服务端证书key,sslCertPem
为自签服务端证书,以下证书是我自己自签测试用的,你们使用前请重新自签证书,自签的方式参考文章:OpenSSL自签证书生成。其中截取部分values.yaml中的配置如下:
1 | emqxConfig: |
部署成功之后查看k8s资源如下:1
2
3
4
5
6
7
8# kubectl get pod | grep emqx
emqx-0 1/1 Running 0 8d
emqx-1 1/1 Running 0 8d
emqx-2 1/1 Running 0 8d
# kubectl get svc | grep emqx
emqx NodePort 172.17.76.33 <none> 1883:30183/TCP,8883:30883/TCP,8081:19884/TCP,8083:30346/TCP,8084:6011/TCP,18083:18958/TCP 41d
emqx-headless ClusterIP None <none> 1883/TCP,8883/TCP,8081/TCP,8083/TCP,8084/TCP,18083/TCP,4370/TCP 41d
查看集群状态如下1
2
3
4
5
6# kubectl exec emqx-0 /opt/emqx/bin/emqx_ctl cluster status
Cluster status: #{running_nodes =>
['emqx@emqx-0.emqx-headless.default.svc.cluster.local',
'emqx@emqx-1.emqx-headless.default.svc.cluster.local',
'emqx@emqx-2.emqx-headless.default.svc.cluster.local'],
stopped_nodes => []}
三 其他
上例中的加密方式比较复杂,为:pbkdf2,sha256,901,24
,对应的工具类如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106import lombok.Data;
import lombok.experimental.Accessors;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.KeySpec;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Mqtt工具类
*
* @author mafgwo
* @since 2022/03/07
*/
public class MqttUtil {
private static final int KEY_LENGTH = 24 * 8;
private static final int SALT_LENGTH = 12;
private static final int ITERATIONS = 901;
private static final char[] HEX_CHARS = {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
private static final SecureRandom RANDOM_GENERATOR = new SecureRandom();
public static void main(String[] args) {
System.out.println(createMqttPwd("test3@mqtt", "7xA1U+v2YmL4YNhY"));
}
/**
* 根据 PBKDF2WithHmacSHA256 算法对密码进行加密
* 对应Emqx的加密 auth.mysql.password_hash=pbkdf2,sha256,901,24 时的emqx auth密码
*
* @param loginPwd
* @return
*/
public static PwdInfo createMqttPwd(String loginPwd, String salt) {
PwdInfo pwdInfo = new PwdInfo();
if (salt == null) {
byte[] someBytes = new byte[SALT_LENGTH];
RANDOM_GENERATOR.nextBytes(someBytes);
String encodedSalt = Base64.getEncoder().encodeToString(someBytes);
pwdInfo.setSalt(encodedSalt);
} else {
pwdInfo.setSalt(salt);
}
SecretKeyFactory f = null;
try {
f = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
} catch (NoSuchAlgorithmException ex) {
Logger.getLogger(MqttUtil.class.getName()).log(Level.SEVERE, null, ex);
throw new RuntimeException("创建mqtt密码异常");
}
KeySpec ks = new PBEKeySpec(loginPwd.toCharArray(), pwdInfo.getSalt().getBytes(), ITERATIONS, KEY_LENGTH);
SecretKey s;
try {
s = f.generateSecret(ks);
byte[] encodedBytes = s.getEncoded();
pwdInfo.setEmqxPw(bytes2hexStr(encodedBytes).toLowerCase());
String encodedKey = Base64.getEncoder().encodeToString(encodedBytes);
String mosPw = "PBKDF2$sha256$" + ITERATIONS + "$" + pwdInfo.getSalt() + "$" + encodedKey;
pwdInfo.setMosPw(mosPw);
} catch (InvalidKeySpecException ex) {
Logger.getLogger(MqttUtil.class.getName()).log(Level.SEVERE, null, ex);
throw new RuntimeException("创建mqtt密码异常");
}
return pwdInfo;
}
/**
* byte[]数组转十六进制
*/
private static String bytes2hexStr(byte[] bytes) {
int len = bytes.length;
if (len==0) {
return "";
}
char[] cbuf = new char[len*2];
for (int i=0; i<len; i++) {
int x = i*2;
cbuf[x] = HEX_CHARS[(bytes[i] >>> 4) & 0xf];
cbuf[x+1] = HEX_CHARS[bytes[i] & 0xf];
}
return new String(cbuf);
}
true) (chain =
public static class PwdInfo {
/**
* 盐
*/
private String salt;
/**
* mosquitto的auth密码
*/
private String mosPw;
/**
* emqx的auth密码
*/
private String emqxPw;
}
}
直接执行后看到的结果如下:1
MqttUtil.PwdInfo(salt=7xA1U+v2YmL4YNhY, mosPw=PBKDF2$sha256$901$7xA1U+v2YmL4YNhY$gH7EBJq8HVtIBx1fYQ4tzgkDOrS8qlBS, emqxPw=807ec4049abc1d5b48071d5f610e2dce09033ab4bcaa5052)