其他
约 916 字大约 3 分钟
2025-07-07
TLS
对于基本的 Conn 类型或在 Reader/Writer 配置中,可以在Dialer中设置TLS选项。如果 TLS 字段为空,则它将不启用TLS 连接。
注意:不在Conn/Reder/Writer上配置TLS,连接到启用TLS的Kafka集群,可能会出现io.ErrUnexpectedEOF错误。
Connection
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")Reader
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})Writer
创建Writer时可以按如下方式指定TLS配置。
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{}, // 指定TLS配置
},
}SASL
可以在Dialer上指定一个选项以使用SASL身份验证。Dialer可以直接用来打开一个 Conn,也可以通过它们各自的配置传递给一个 Reader 或 Writer。如果 SASLMechanism字段为 nil,则不会使用 SASL 进行身份验证。
SASL 身份验证类型
明文
mechanism := plain.Mechanism{
Username: "username",
Password: "password",
}SCRAM
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}Connection
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")Reader
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})Writer
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport 负责管理连接池和其他资源,
// 通常最好的使用方式是创建后在应用程序中共享使用它们。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: sharedTransport,
}Client
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {
panic(err)
}
// Transport 负责管理连接池和其他资源,
// 通常最好的使用方式是创建后在应用程序中共享使用它们。
sharedTransport := &kafka.Transport{
SASL: mechanism,
}
client := &kafka.Client{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Timeout: 10 * time.Second,
Transport: sharedTransport,
}Balancer
kafka-go实现了多种负载均衡策略。特别是当你从其他Kafka库迁移过来时,你可以按如下说明选择合适的Balancer实现。
Sarama
如果从 sarama 切换过来,并且需要/希望使用相同的算法进行消息分区,则可以使用kafka.Hash或kafka.ReferenceHash。
- kafka.Hash = sarama.NewHashPartitioner
- kafka.ReferenceHash = sarama.NewReferenceHashPartitioner
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
}librdkafka和confluent-kafka-go
kafka.CRC32Balancer与librdkafka默认的consistent_random策略表现一致。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.CRC32Balancer{},
}Java
使用kafka.Murmur2Balancer可以获得与默认Java客户端相同的策略。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: kafka.Murmur2Balancer{},
}Compression
可以通过设置Compression字段在Writer上启用压缩:
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Compression: kafka.Snappy,
}Reader 将通过检查消息属性来确定消费的消息是否被压缩。
Logging
想要记录Reader/Writer类型的操作,可以在创建时配置日志记录器。
kafka-go中的Logger是一个接口类型。
type Logger interface {
Printf(string, ...interface{})
}并且提供了一个LoggerFunc类型,帮我们实现了Logger接口。
type LoggerFunc func(string, ...interface{})
func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }Reader
借助kafka.LoggerFunc我们可以自定义一个Logger。
// 自定义一个Logger
func logf(msg string, a ...interface{}) {
fmt.Printf(msg, a...)
fmt.Println()
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "q1mi-topic",
Partition: 0,
Logger: kafka.LoggerFunc(logf),
ErrorLogger: kafka.LoggerFunc(logf),
})Writer
也可以直接使用第三方日志库,例如下面示例代码中使用了zap日志库。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "q1mi-topic",
Logger: kafka.LoggerFunc(zap.NewExample().Sugar().Infof),
ErrorLogger: kafka.LoggerFunc(zap.NewExample().Sugar().Errorf),
}