chore:reva bump v.2.33 (#884)

This commit is contained in:
Viktor Scharf
2025-05-19 16:39:56 +02:00
committed by GitHub
parent bf9e80a335
commit 13049b7b9d
24 changed files with 241 additions and 183 deletions

4
go.mod
View File

@@ -64,7 +64,7 @@ require (
github.com/onsi/gomega v1.37.0
github.com/open-policy-agent/opa v1.4.2
github.com/opencloud-eu/libre-graph-api-go v1.0.5
github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d
github.com/opencloud-eu/reva/v2 v2.33.0
github.com/orcaman/concurrent-map v1.0.0
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.10
@@ -285,7 +285,7 @@ require (
github.com/rs/xid v1.6.0 // indirect
github.com/russellhaering/goxmldsig v1.5.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/kafka-go v0.4.47 // indirect
github.com/segmentio/kafka-go v0.4.48 // indirect
github.com/segmentio/ksuid v1.0.4 // indirect
github.com/sercand/kuberesolver/v5 v5.1.1 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect

8
go.sum
View File

@@ -862,8 +862,8 @@ github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-202505121527
github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-20250512152754-23325793059a/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/opencloud-eu/libre-graph-api-go v1.0.5 h1:Wv09oIjCF8zRN8roPzjXXo6ORp2h87/YhmdXE9N4p/A=
github.com/opencloud-eu/libre-graph-api-go v1.0.5/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d h1:c7AGNgYPm4Ix3YU1vptCN01HjZ1ZpRw91QrqJnSEkxM=
github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d/go.mod h1:moFklKM4+TwF8iqeFnX64/8TlbqFSsfAkV30Q0FCTt4=
github.com/opencloud-eu/reva/v2 v2.33.0 h1:uFbt4BC21gU0bbrp4CHABhWR4Xk5H+e2kA7KrEUXGEQ=
github.com/opencloud-eu/reva/v2 v2.33.0/go.mod h1:wRZ/7eJTOfkhjtDlYcNt83Loz2drfXssh+5JMDcq/5o=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
@@ -989,8 +989,8 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb
github.com/sacloud/libsacloud v1.36.2/go.mod h1:P7YAOVmnIn3DKHqCZcUKYUXmSwGBm3yS7IBEjKVSrjg=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.7.0.20210127161313-bd30bebeac4f/go.mod h1:CJJ5VAbozOl0yEw7nHB9+7BXTJbIn6h7W+f6Gau5IP8=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs=
github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY=

View File

@@ -4,4 +4,4 @@ test:
go test -race -cover ./...
docker:
docker-compose up -d
docker compose up -d

View File

@@ -108,7 +108,7 @@ if err := conn.Close(); err != nil {
```
### To Create Topics
By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_AUTO_CREATE_TOPICS_ENABLE='true'` in the wurstmeister/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so:
By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true'` in the bitnami/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so:
```go
// to create topics when auto.create.topics.enable='true'
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
@@ -797,3 +797,8 @@ KAFKA_VERSION=2.3.1 \
KAFKA_SKIP_NETTEST=1 \
go test -race ./...
```
(or) to clean up the cached test results and run tests:
```
go clean -cache && make test
```

View File

@@ -46,7 +46,7 @@ func (batch *Batch) Throttle() time.Duration {
return batch.throttle
}
// Watermark returns the current highest watermark in a partition.
// HighWaterMark returns the current highest watermark in a partition.
func (batch *Batch) HighWaterMark() int64 {
return batch.highWaterMark
}

View File

@@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) {
// DeleteTopics deletes the specified topics.
func (c *Conn) DeleteTopics(topics ...string) error {
_, err := c.deleteTopics(deleteTopicsRequestV0{
_, err := c.deleteTopics(deleteTopicsRequest{
Topics: topics,
})
return err
@@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error
// joinGroup attempts to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
var response joinGroupResponseV1
func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) {
version, err := c.negotiateVersion(joinGroup, v1, v2)
if err != nil {
return joinGroupResponse{}, err
}
err := c.writeOperation(
response := joinGroupResponse{v: version}
err = c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(joinGroup, v1, id, request)
return c.writeRequest(joinGroup, version, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
@@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
},
)
if err != nil {
return joinGroupResponseV1{}, err
return joinGroupResponse{}, err
}
if response.ErrorCode != 0 {
return joinGroupResponseV1{}, Error(response.ErrorCode)
return joinGroupResponse{}, Error(response.ErrorCode)
}
return response, nil

View File

@@ -527,7 +527,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
case err == nil, errors.Is(err, UnknownTopicOrPartition):
if len(ops) != oParts {
g.log(func(l Logger) {
l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
})
return
}
@@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
type coordinator interface {
io.Closer
findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
joinGroup(joinGroupRequest) (joinGroupResponse, error)
syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find
return t.conn.findCoordinator(req)
}
func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
// in the case of join group, the consumer group coordinator may wait up
// to rebalance timeout in order to wait for all members to join.
if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
return joinGroupResponseV1{}, err
return joinGroupResponse{}, err
}
return t.conn.joinGroup(req)
}
@@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
request, err := cg.makeJoinGroupRequest(memberID)
if err != nil {
return "", 0, nil, err
}
@@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
// request.
func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
request := joinGroupRequestV1{
func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) {
request := joinGroupRequest{
GroupID: cg.config.ID,
MemberID: memberID,
SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
@@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
for _, balancer := range cg.config.GroupBalancers {
userData, err := balancer.UserData()
if err != nil {
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
}
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
ProtocolName: balancer.ProtocolName(),
@@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
// assignTopicPartitions uses the selected GroupBalancer to assign members to
// their various partitions.
func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) {
cg.withLogger(func(l Logger) {
l.Printf("selected as leader for group, %s\n", cg.config.ID)
})
@@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
}
// makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) {
members := make([]GroupMember, 0, len(in))
for _, item := range in {
metadata := groupMetadata{}

View File

@@ -10,7 +10,7 @@ import (
"github.com/segmentio/kafka-go/protocol/createtopics"
)
// CreateTopicRequests represents a request sent to a kafka broker to create
// CreateTopicsRequest represents a request sent to a kafka broker to create
// new topics.
type CreateTopicsRequest struct {
// Address of the kafka broker to send the request to.
@@ -27,7 +27,7 @@ type CreateTopicsRequest struct {
ValidateOnly bool
}
// CreateTopicResponse represents a response from a kafka broker to a topic
// CreateTopicsResponse represents a response from a kafka broker to a topic
// creation request.
type CreateTopicsResponse struct {
// The amount of time that the broker throttled the request.
@@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) {
}
// See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
type createTopicsRequestV0 struct {
type createTopicsRequest struct {
v apiVersion // v0, v1, v2
// Topics contains n array of single topic creation requests. Can not
// have multiple entries for the same topic.
Topics []createTopicsRequestV0Topic
@@ -270,86 +272,136 @@ type createTopicsRequestV0 struct {
// Timeout ms to wait for a topic to be completely created on the
// controller node. Values <= 0 will trigger topic creation and return immediately
Timeout int32
// If true, check that the topics can be created as specified, but don't create anything.
// Internal use only for Kafka 4.0 support.
ValidateOnly bool
}
func (t createTopicsRequestV0) size() int32 {
return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
func (t createTopicsRequest) size() int32 {
sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
sizeofInt32(t.Timeout)
if t.v >= v1 {
sz += 1
}
return sz
}
func (t createTopicsRequestV0) writeTo(wb *writeBuffer) {
func (t createTopicsRequest) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
wb.writeInt32(t.Timeout)
if t.v >= v1 {
wb.writeBool(t.ValidateOnly)
}
}
type createTopicsResponseV0TopicError struct {
type createTopicsResponseTopicError struct {
v apiVersion
// Topic name
Topic string
// ErrorCode holds response error code
ErrorCode int16
// ErrorMessage holds response error message string
ErrorMessage string
}
func (t createTopicsResponseV0TopicError) size() int32 {
return sizeofString(t.Topic) +
func (t createTopicsResponseTopicError) size() int32 {
sz := sizeofString(t.Topic) +
sizeofInt16(t.ErrorCode)
if t.v >= v1 {
sz += sizeofString(t.ErrorMessage)
}
return sz
}
func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) {
func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) {
wb.writeString(t.Topic)
wb.writeInt16(t.ErrorCode)
if t.v >= v1 {
wb.writeString(t.ErrorMessage)
}
}
func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.Topic); err != nil {
return
}
if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
return
}
if t.v >= v1 {
if remain, err = readString(r, remain, &t.ErrorMessage); err != nil {
return
}
}
return
}
// See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
type createTopicsResponseV0 struct {
TopicErrors []createTopicsResponseV0TopicError
type createTopicsResponse struct {
v apiVersion
ThrottleTime int32 // v2+
TopicErrors []createTopicsResponseTopicError
}
func (t createTopicsResponseV0) size() int32 {
return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
func (t createTopicsResponse) size() int32 {
sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
if t.v >= v2 {
sz += sizeofInt32(t.ThrottleTime)
}
return sz
}
func (t createTopicsResponseV0) writeTo(wb *writeBuffer) {
func (t createTopicsResponse) writeTo(wb *writeBuffer) {
if t.v >= v2 {
wb.writeInt32(t.ThrottleTime)
}
wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) })
}
func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
var topic createTopicsResponseV0TopicError
if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil {
topic := createTopicsResponseTopicError{v: t.v}
if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil {
return
}
t.TopicErrors = append(t.TopicErrors, topic)
return
}
if remain, err = readArrayWith(r, size, fn); err != nil {
remain = size
if t.v >= v2 {
if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
return
}
}
if remain, err = readArrayWith(r, remain, fn); err != nil {
return
}
return
}
func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) {
var response createTopicsResponseV0
func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) {
version, err := c.negotiateVersion(createTopics, v0, v1, v2)
if err != nil {
return createTopicsResponse{}, err
}
err := c.writeOperation(
request.v = version
response := createTopicsResponse{v: version}
err = c.writeOperation(
func(deadline time.Time, id int32) error {
if request.Timeout == 0 {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
}
return c.writeRequest(createTopics, v0, id, request)
return c.writeRequest(createTopics, version, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
@@ -383,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
t.toCreateTopicsRequestV0Topic())
}
_, err := c.createTopics(createTopicsRequestV0{
_, err := c.createTopics(createTopicsRequest{
Topics: requestV0Topics,
})
return err

View File

@@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D
}
// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
type deleteTopicsRequestV0 struct {
type deleteTopicsRequest struct {
// Topics holds the topic names
Topics []string
@@ -77,41 +77,57 @@ type deleteTopicsRequestV0 struct {
Timeout int32
}
func (t deleteTopicsRequestV0) size() int32 {
func (t deleteTopicsRequest) size() int32 {
return sizeofStringArray(t.Topics) +
sizeofInt32(t.Timeout)
}
func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) {
func (t deleteTopicsRequest) writeTo(wb *writeBuffer) {
wb.writeStringArray(t.Topics)
wb.writeInt32(t.Timeout)
}
type deleteTopicsResponseV0 struct {
type deleteTopicsResponse struct {
v apiVersion // v0, v1
ThrottleTime int32
// TopicErrorCodes holds per topic error codes
TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
}
func (t deleteTopicsResponseV0) size() int32 {
return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
func (t deleteTopicsResponse) size() int32 {
sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
if t.v >= v1 {
sz += sizeofInt32(t.ThrottleTime)
}
return sz
}
func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
var item deleteTopicsResponseV0TopicErrorCode
if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
return
}
t.TopicErrorCodes = append(t.TopicErrorCodes, item)
return
}
if remain, err = readArrayWith(r, size, fn); err != nil {
remain = size
if t.v >= v1 {
if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
return
}
}
if remain, err = readArrayWith(r, remain, fn); err != nil {
return
}
return
}
func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) {
func (t deleteTopicsResponse) writeTo(wb *writeBuffer) {
if t.v >= v1 {
wb.writeInt32(t.ThrottleTime)
}
wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) })
}
@@ -146,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) {
// deleteTopics deletes the specified topics.
//
// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
var response deleteTopicsResponseV0
err := c.writeOperation(
func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) {
version, err := c.negotiateVersion(deleteTopics, v0, v1)
if err != nil {
return deleteTopicsResponse{}, err
}
response := deleteTopicsResponse{
v: version,
}
err = c.writeOperation(
func(deadline time.Time, id int32) error {
if request.Timeout == 0 {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
}
return c.writeRequest(deleteTopics, v0, id, request)
return c.writeRequest(deleteTopics, version, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
@@ -164,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse
},
)
if err != nil {
return deleteTopicsResponseV0{}, err
return deleteTopicsResponse{}, err
}
for _, c := range response.TopicErrorCodes {
if c.ErrorCode != 0 {

View File

@@ -35,7 +35,7 @@ type DescribeClientQuotasRequestComponent struct {
Match string
}
// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request.
// DescribeClientQuotasResponse represents a response from a kafka broker to a describe client quota request.
type DescribeClientQuotasResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

View File

@@ -1,32 +0,0 @@
version: "3"
services:
kafka:
image: wurstmeister/kafka:2.12-2.4.1
restart: on-failure:3
links:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
KAFKA_VERSION: '2.4.1'
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: '200000000'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
zookeeper:
image: wurstmeister/zookeeper
ports:
- 2181:2181

View File

@@ -1,29 +0,0 @@
version: "3"
services:
kafka:
image: wurstmeister/kafka:0.10.1.1
links:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: '200000000'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
zookeeper:
image: wurstmeister/zookeeper
ports:
- 2181:2181

View File

@@ -1,34 +1,39 @@
version: "3"
# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
version: '3'
services:
zookeeper:
container_name: zookeeper
hostname: zookeeper
image: bitnami/zookeeper:latest
ports:
- 2181:2181
environment:
ALLOW_ANONYMOUS_LOGIN: yes
kafka:
image: wurstmeister/kafka:2.12-2.3.1
container_name: kafka
image: bitnami/kafka:3.7.0
restart: on-failure:3
links:
- zookeeper
- zookeeper
ports:
- 9092:9092
- 9093:9093
- 9092:9092
- 9093:9093
environment:
KAFKA_VERSION: '2.3.1'
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: '200000000'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
zookeeper:
image: wurstmeister/zookeeper
ports:
- 2181:2181
KAFKA_CFG_BROKER_ID: 1
KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_CFG_ADVERTISED_PORT: '9092'
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
ALLOW_PLAINTEXT_LISTENER: yes
entrypoint:
- "/bin/bash"
- "-c"
- echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh

View File

@@ -329,6 +329,8 @@ func (e Error) Title() string {
return "Unsupported Compression Type"
case MemberIDRequired:
return "Member ID Required"
case FencedInstanceID:
return "Fenced Instance ID"
case EligibleLeadersNotAvailable:
return "Eligible Leader Not Available"
case ElectionNotNeeded:
@@ -538,6 +540,8 @@ func (e Error) Description() string {
return "the requesting client does not support the compression type of given partition"
case MemberIDRequired:
return "the group member needs to have a valid member id before actually entering a consumer group"
case FencedInstanceID:
return "the broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id"
case EligibleLeadersNotAvailable:
return "eligible topic partition leaders are not available"
case ElectionNotNeeded:
@@ -636,6 +640,7 @@ func coalesceErrors(errs ...error) error {
return nil
}
// MessageTooLargeError is returned when a message is too large to fit within the allowed size.
type MessageTooLargeError struct {
Message Message
Remaining []Message
@@ -655,6 +660,10 @@ func (e MessageTooLargeError) Error() string {
return MessageSizeTooLarge.Error()
}
func (e MessageTooLargeError) Unwrap() error {
return MessageSizeTooLarge
}
func makeError(code int16, message string) error {
if code == 0 {
return nil

View File

@@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
wb.writeBytes(t.ProtocolMetadata)
}
type joinGroupRequestV1 struct {
type joinGroupRequest struct {
// GroupID holds the unique group identifier
GroupID string
@@ -264,7 +264,7 @@ type joinGroupRequestV1 struct {
GroupProtocols []joinGroupRequestGroupProtocolV1
}
func (t joinGroupRequestV1) size() int32 {
func (t joinGroupRequest) size() int32 {
return sizeofString(t.GroupID) +
sizeofInt32(t.SessionTimeout) +
sizeofInt32(t.RebalanceTimeout) +
@@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 {
sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
}
func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
func (t joinGroupRequest) writeTo(wb *writeBuffer) {
wb.writeString(t.GroupID)
wb.writeInt32(t.SessionTimeout)
wb.writeInt32(t.RebalanceTimeout)
@@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
}
type joinGroupResponseMemberV1 struct {
type joinGroupResponseMember struct {
// MemberID assigned by the group coordinator
MemberID string
MemberMetadata []byte
}
func (t joinGroupResponseMemberV1) size() int32 {
func (t joinGroupResponseMember) size() int32 {
return sizeofString(t.MemberID) +
sizeofBytes(t.MemberMetadata)
}
func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
func (t joinGroupResponseMember) writeTo(wb *writeBuffer) {
wb.writeString(t.MemberID)
wb.writeBytes(t.MemberMetadata)
}
func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.MemberID); err != nil {
return
}
@@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain
return
}
type joinGroupResponseV1 struct {
type joinGroupResponse struct {
v apiVersion // v1, v2
ThrottleTime int32
// ErrorCode holds response error code
ErrorCode int16
@@ -323,19 +327,26 @@ type joinGroupResponseV1 struct {
// MemberID assigned by the group coordinator
MemberID string
Members []joinGroupResponseMemberV1
Members []joinGroupResponseMember
}
func (t joinGroupResponseV1) size() int32 {
return sizeofInt16(t.ErrorCode) +
func (t joinGroupResponse) size() int32 {
sz := sizeofInt16(t.ErrorCode) +
sizeofInt32(t.GenerationID) +
sizeofString(t.GroupProtocol) +
sizeofString(t.LeaderID) +
sizeofString(t.MemberID) +
sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
if t.v >= v2 {
sz += sizeofInt32(t.ThrottleTime)
}
return sz
}
func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
func (t joinGroupResponse) writeTo(wb *writeBuffer) {
if t.v >= v2 {
wb.writeInt32(t.ThrottleTime)
}
wb.writeInt16(t.ErrorCode)
wb.writeInt32(t.GenerationID)
wb.writeString(t.GroupProtocol)
@@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
}
func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
remain = size
if t.v >= v2 {
if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil {
return
}
}
if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
return
}
if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
@@ -362,7 +379,7 @@ func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, e
}
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
var item joinGroupResponseMemberV1
var item joinGroupResponseMember
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
return
}

View File

@@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int,
fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
var item listGroupsResponseGroupV1
if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
return
}
t.Groups = append(t.Groups, item)

View File

@@ -17,7 +17,7 @@ type OffsetRequest struct {
}
// FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
// the parition given as argument.
// the partition given as argument.
func FirstOffsetOf(partition int) OffsetRequest {
return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
}

View File

@@ -19,7 +19,7 @@ type MetadataRequest struct {
Topics []string
}
// MetadatResponse represents a response from a kafka broker to a metadata
// MetadataResponse represents a response from a kafka broker to a metadata
// request.
type MetadataResponse struct {
// The amount of time that the broker throttled the request.

View File

@@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := offsetFetchResponseV1PartitionResponse{}
if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
return
}
t.PartitionResponses = append(t.PartitionResponses, item)

View File

@@ -191,7 +191,7 @@ func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) {
// Reconstruct the prefix that we had to read to determine the version
// of the record set from the magic byte.
//
// Technically this may recurisvely stack readers when consuming all
// Technically this may recursively stack readers when consuming all
// items of the batch, which could hurt performance. In practice this
// path should not be taken tho, since the decoder would read from a
// *bufio.Reader which implements the bufferedReader interface.
@@ -304,7 +304,7 @@ type RawRecordSet struct {
// then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet.
//
// Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a
// performance standpoint as it require an extra copy of the record bytes. Holding off
// performance standpoint as it requires an extra copy of the record bytes. Holding off
// on optimizing, as this code path is only invoked in tests.
func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) {
rs := &RecordSet{}

View File

@@ -469,9 +469,11 @@ type ReaderConfig struct {
JoinGroupBackoff time.Duration
// RetentionTime optionally sets the length of time the consumer group will be saved
// by the broker
// by the broker. -1 will disable the setting and leave the
// retention up to the broker's offsets.retention.minutes property. By
// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= 2.0.
//
// Default: 24h
// Default: -1
//
// Only used when GroupID is set
RetentionTime time.Duration

View File

@@ -35,7 +35,7 @@ type Record = protocol.Record
// RecordReader values are not safe to use concurrently from multiple goroutines.
type RecordReader = protocol.RecordReader
// NewRecordReade reconstructs a RecordSet which exposes the sequence of records
// NewRecordReader reconstructs a RecordSet which exposes the sequence of records
// passed as arguments.
func NewRecordReader(records ...Record) RecordReader {
return protocol.NewRecordReader(records...)

View File

@@ -635,7 +635,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
}
}
// We use int32 here to half the memory footprint (compared to using int
// We use int32 here to halve the memory footprint (compared to using int
// on 64 bits architectures). We map lists of the message indexes instead
// of the message values for the same reason, int32 is 4 bytes, vs a full
// Message value which is 100+ bytes and contains pointers and contributes

4
vendor/modules.txt vendored
View File

@@ -1202,7 +1202,7 @@ github.com/open-policy-agent/opa/v1/version
# github.com/opencloud-eu/libre-graph-api-go v1.0.5
## explicit; go 1.18
github.com/opencloud-eu/libre-graph-api-go
# github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d
# github.com/opencloud-eu/reva/v2 v2.33.0
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime
@@ -1713,7 +1713,7 @@ github.com/russellhaering/goxmldsig/types
# github.com/russross/blackfriday/v2 v2.1.0
## explicit
github.com/russross/blackfriday/v2
# github.com/segmentio/kafka-go v0.4.47
# github.com/segmentio/kafka-go v0.4.48
## explicit; go 1.15
github.com/segmentio/kafka-go
github.com/segmentio/kafka-go/compress