外网访问配置
开启集群认证配置
查看 SASL 接入点
配置外网转发
1. 创建与UKafka集群处于同一子网下的云主机
注意:本文档基于 CentOS 7.9
2. 在云主机上安装 nginx
yum install nginx nginx-all-modules.noarch
3. 配置 nginx 代理
- 编辑
/etc/nginx/nginx.conf
,增加 stream 配置
stream {
log_format proxy '$remote_addr [$time_local] '
'$protocol $status $bytes_sent $bytes_received '
'$session_time "$upstream_addr" '
'"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';
access_log /var/log/nginx/tcp-access.log proxy;
open_log_file_cache off;
# 统一放置,方便管理
include /etc/nginx/tcpConf.d/*.conf;
}
- 创建
/etc/nginx/tcpConf.d/
目录
mkdir -p /etc/nginx/tcpConf.d/
- 编辑
/etc/nginx/tcpConf.d/kafka.conf
配置文件
upstream tcp9093 {
server 10.23.26.105:9093;
}
upstream tcp9094 {
server 10.23.180.35:9094;
}
upstream tcp9095 {
server 10.23.202.164:9095;
}
server {
listen 9093;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9093;
}
server {
listen 9094;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9094;
}
server {
listen 9095;
proxy_connect_timeout 8s;
proxy_timeout 24h;
proxy_pass tcp9095;
}
- 启动 nginx
systemctl start nginx.service
- 验证 nginx 服务状态
systemctl status nginx.service
4. 主机防火墙配置
主机防火墙需要接收转发请求的 9093, 9094, 9095 端口。
外网访问
1. 配置本地 hosts
修改本地 /etc/hosts
文件,将 UKafka SASL_PLAINTEXT 协议监听的地址,全都指向转发请求的云主机的公网 IP:
113.31.114.125 ukafka-q0j01x5g-kafka1
113.31.114.125 ukafka-q0j01x5g-kafka2
113.31.114.125 ukafka-q0j01x5g-kafka3
2. Kafka 命令行工具访问
下载并配置 Kafka 命令行工具
以 Kafka 2.6.1 命令行工具为例:
可按实例版本下载命令行工具
# 下载命令行工具
wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz
# 解压
tar -zxvf kafka_2.13-2.6.1.tgz
# 进入命令行工具根目录
cd kafka_2.13-2.6.1
配置 config/kafka_client_jaas.conf
文件如下,username
和 password
填写控制台开启集群认证配置的用户名和密码:
cat > config/kafka_client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin_pass";
};
EOF
发送消息
- 修改
bin/kafka-console-producer.sh
增加java.security.auth.login.config
参数
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
- 修改
config/producer.properties
,指定security.protocol
,sasl.mechanism
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 运行
bin/kafka-console-producer.sh
发送消息:
bin/kafka-console-producer.sh --producer.config ./config/producer.properties --topic foo --broker-list ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095
>hello
>world
>foo
>bar
>
接收消息
- 修改
bin/kafka-console-consumer.sh
增加java.security.auth.login.config
参数
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
- 修改
config/consumer.properties
,指定security.protocol
,sasl.mechanism
group.id=test-consumer-group
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 运行
bin/kafka-console-consumer.sh
接收消息
bin/kafka-console-consumer.sh --consumer.config ./config/consumer.properties --topic foo --bootstrap-server ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095 --from-beginning
world
hello
foo
bar
3. Java SDK 访问
Java SDK 访问有多种方式,如配置 kafka_client_jaas.conf
文件,或直接设置属性 sasl.jaas.config
,详情可参考下述代码示例中的三种方式。
如使用 kafka_client_jaas.conf
配置文件,username
和 password
填写控制台开启集群认证配置的用户名和密码:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin_pass";
};
生产者示例
package cn.ucloud.ukafka.Example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception {
// 方式1:添加环境变量,需要指定配置文件的路径
System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
// 方式2:添加启动 JVM 参数 `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
Properties props = new Properties();
props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
// 需要指定 security.protocol 与 sasl.mechanism
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// 方式3:Properties 设置 jaas 配置内容
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = "foo";
String value = "this is the message's value";
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value);
try {
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
System.out.println(String.format("Produce ok: topic:%s partition:%d offset:%d value:%s",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), value));
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者示例
package cn.ucloud.ukafka.Example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
// 方式1:添加环境变量,需要指定配置文件的路径
System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");
// 方式2:添加启动 JVM 参数 `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`
Properties props = new Properties();
props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
props.put("group.id", "test_group");
props.put("auto.offset.reset", "earliest");
// 需要指定 security.protocol 与 sasl.mechanism
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// 方式3:Properties 设置 jaas 配置内容
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<String> subscribedTopics = new ArrayList<String>();
subscribedTopics.add("foo");
consumer.subscribe(subscribedTopics);
while (true){
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume topic:%s partition:%d offset:%d value:%s",
record.topic(), record.partition(), record.offset(), record.value()));
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
不同语言的 Kafka SDK 对认证的支持与设置存在区别,具体设置需要查看相关 SDK 的文档