From 13049b7b9dcf33a5e0d306f36581bc2daf99f1ea Mon Sep 17 00:00:00 2001 From: Viktor Scharf Date: Mon, 19 May 2025 16:39:56 +0200 Subject: [PATCH] chore:reva bump v.2.33 (#884) --- go.mod | 4 +- go.sum | 8 +- vendor/github.com/segmentio/kafka-go/Makefile | 2 +- .../github.com/segmentio/kafka-go/README.md | 7 +- vendor/github.com/segmentio/kafka-go/batch.go | 2 +- vendor/github.com/segmentio/kafka-go/conn.go | 19 ++-- .../segmentio/kafka-go/consumergroup.go | 20 ++-- .../segmentio/kafka-go/createtopics.go | 102 +++++++++++++----- .../segmentio/kafka-go/deletetopics.go | 54 +++++++--- .../kafka-go/describeclientquotas.go | 2 +- .../segmentio/kafka-go/docker-compose-241.yml | 32 ------ .../segmentio/kafka-go/docker-compose.010.yml | 29 ----- .../segmentio/kafka-go/docker-compose.yml | 61 ++++++----- vendor/github.com/segmentio/kafka-go/error.go | 9 ++ .../segmentio/kafka-go/joingroup.go | 47 +++++--- .../segmentio/kafka-go/listgroups.go | 2 +- .../segmentio/kafka-go/listoffset.go | 2 +- .../github.com/segmentio/kafka-go/metadata.go | 2 +- .../segmentio/kafka-go/offsetfetch.go | 2 +- .../segmentio/kafka-go/protocol/record.go | 4 +- .../github.com/segmentio/kafka-go/reader.go | 6 +- .../github.com/segmentio/kafka-go/record.go | 2 +- .../github.com/segmentio/kafka-go/writer.go | 2 +- vendor/modules.txt | 4 +- 24 files changed, 241 insertions(+), 183 deletions(-) delete mode 100644 vendor/github.com/segmentio/kafka-go/docker-compose-241.yml delete mode 100644 vendor/github.com/segmentio/kafka-go/docker-compose.010.yml diff --git a/go.mod b/go.mod index 60705a38a..96a71e960 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/onsi/gomega v1.37.0 github.com/open-policy-agent/opa v1.4.2 github.com/opencloud-eu/libre-graph-api-go v1.0.5 - github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d + github.com/opencloud-eu/reva/v2 v2.33.0 github.com/orcaman/concurrent-map v1.0.0 github.com/pkg/errors v0.9.1 github.com/pkg/xattr v0.4.10 @@ -285,7 +285,7 @@ require ( github.com/rs/xid v1.6.0 // indirect github.com/russellhaering/goxmldsig v1.5.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/segmentio/kafka-go v0.4.47 // indirect + github.com/segmentio/kafka-go v0.4.48 // indirect github.com/segmentio/ksuid v1.0.4 // indirect github.com/sercand/kuberesolver/v5 v5.1.1 // indirect github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect diff --git a/go.sum b/go.sum index 5630628a3..8afbb09d7 100644 --- a/go.sum +++ b/go.sum @@ -862,8 +862,8 @@ github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-202505121527 github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-20250512152754-23325793059a/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY= github.com/opencloud-eu/libre-graph-api-go v1.0.5 h1:Wv09oIjCF8zRN8roPzjXXo6ORp2h87/YhmdXE9N4p/A= github.com/opencloud-eu/libre-graph-api-go v1.0.5/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q= -github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d h1:c7AGNgYPm4Ix3YU1vptCN01HjZ1ZpRw91QrqJnSEkxM= -github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d/go.mod h1:moFklKM4+TwF8iqeFnX64/8TlbqFSsfAkV30Q0FCTt4= +github.com/opencloud-eu/reva/v2 v2.33.0 h1:uFbt4BC21gU0bbrp4CHABhWR4Xk5H+e2kA7KrEUXGEQ= +github.com/opencloud-eu/reva/v2 v2.33.0/go.mod h1:wRZ/7eJTOfkhjtDlYcNt83Loz2drfXssh+5JMDcq/5o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -989,8 +989,8 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb github.com/sacloud/libsacloud v1.36.2/go.mod h1:P7YAOVmnIn3DKHqCZcUKYUXmSwGBm3yS7IBEjKVSrjg= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.7.0.20210127161313-bd30bebeac4f/go.mod h1:CJJ5VAbozOl0yEw7nHB9+7BXTJbIn6h7W+f6Gau5IP8= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= -github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/segmentio/kafka-go v0.4.48 h1:9jyu9CWK4W5W+SroCe8EffbrRZVqAOkuaLd/ApID4Vs= +github.com/segmentio/kafka-go v0.4.48/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= github.com/sercand/kuberesolver/v5 v5.1.1 h1:CYH+d67G0sGBj7q5wLK61yzqJJ8gLLC8aeprPTHb6yY= diff --git a/vendor/github.com/segmentio/kafka-go/Makefile b/vendor/github.com/segmentio/kafka-go/Makefile index e2374f2e3..47f45c950 100644 --- a/vendor/github.com/segmentio/kafka-go/Makefile +++ b/vendor/github.com/segmentio/kafka-go/Makefile @@ -4,4 +4,4 @@ test: go test -race -cover ./... docker: - docker-compose up -d + docker compose up -d diff --git a/vendor/github.com/segmentio/kafka-go/README.md b/vendor/github.com/segmentio/kafka-go/README.md index e17878825..4e6cd1229 100644 --- a/vendor/github.com/segmentio/kafka-go/README.md +++ b/vendor/github.com/segmentio/kafka-go/README.md @@ -108,7 +108,7 @@ if err := conn.Close(); err != nil { ``` ### To Create Topics -By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_AUTO_CREATE_TOPICS_ENABLE='true'` in the wurstmeister/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so: +By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true'` in the bitnami/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so: ```go // to create topics when auto.create.topics.enable='true' conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0) @@ -797,3 +797,8 @@ KAFKA_VERSION=2.3.1 \ KAFKA_SKIP_NETTEST=1 \ go test -race ./... ``` + +(or) to clean up the cached test results and run tests: +``` +go clean -cache && make test +``` diff --git a/vendor/github.com/segmentio/kafka-go/batch.go b/vendor/github.com/segmentio/kafka-go/batch.go index 19dcef8cd..eb742712d 100644 --- a/vendor/github.com/segmentio/kafka-go/batch.go +++ b/vendor/github.com/segmentio/kafka-go/batch.go @@ -46,7 +46,7 @@ func (batch *Batch) Throttle() time.Duration { return batch.throttle } -// Watermark returns the current highest watermark in a partition. +// HighWaterMark returns the current highest watermark in a partition. func (batch *Batch) HighWaterMark() int64 { return batch.highWaterMark } diff --git a/vendor/github.com/segmentio/kafka-go/conn.go b/vendor/github.com/segmentio/kafka-go/conn.go index 2b51afbd5..9f9f25903 100644 --- a/vendor/github.com/segmentio/kafka-go/conn.go +++ b/vendor/github.com/segmentio/kafka-go/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV0{ + _, err := c.deleteTopics(deleteTopicsRequest{ Topics: topics, }) return err @@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) { - var response joinGroupResponseV1 +func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) { + version, err := c.negotiateVersion(joinGroup, v1, v2) + if err != nil { + return joinGroupResponse{}, err + } - err := c.writeOperation( + response := joinGroupResponse{v: version} + + err = c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroup, v1, id, request) + return c.writeRequest(joinGroup, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error }, ) if err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } if response.ErrorCode != 0 { - return joinGroupResponseV1{}, Error(response.ErrorCode) + return joinGroupResponse{}, Error(response.ErrorCode) } return response, nil diff --git a/vendor/github.com/segmentio/kafka-go/consumergroup.go b/vendor/github.com/segmentio/kafka-go/consumergroup.go index b9d0a7e2e..b32f90162 100644 --- a/vendor/github.com/segmentio/kafka-go/consumergroup.go +++ b/vendor/github.com/segmentio/kafka-go/consumergroup.go @@ -527,7 +527,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { case err == nil, errors.Is(err, UnknownTopicOrPartition): if len(ops) != oParts { g.log(func(l Logger) { - l.Printf("Partition changes found, reblancing group: %v.", g.GroupID) + l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID) }) return } @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error) + joinGroup(joinGroupRequest) (joinGroupResponse, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } return t.conn.joinGroup(req) } @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV1(memberID) + request, err := cg.makeJoinGroupRequest(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { - request := joinGroupRequestV1{ +func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) { + request := joinGroupRequest{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), @@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque // assignTopicPartitions uses the selected GroupBalancer to assign members to // their various partitions. -func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) { +func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) { cg.withLogger(func(l Logger) { l.Printf("selected as leader for group, %s\n", cg.config.ID) }) @@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup } // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember. -func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) { +func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) { members := make([]GroupMember, 0, len(in)) for _, item := range in { metadata := groupMetadata{} diff --git a/vendor/github.com/segmentio/kafka-go/createtopics.go b/vendor/github.com/segmentio/kafka-go/createtopics.go index 8ad9ebf44..9c75d7aaa 100644 --- a/vendor/github.com/segmentio/kafka-go/createtopics.go +++ b/vendor/github.com/segmentio/kafka-go/createtopics.go @@ -10,7 +10,7 @@ import ( "github.com/segmentio/kafka-go/protocol/createtopics" ) -// CreateTopicRequests represents a request sent to a kafka broker to create +// CreateTopicsRequest represents a request sent to a kafka broker to create // new topics. type CreateTopicsRequest struct { // Address of the kafka broker to send the request to. @@ -27,7 +27,7 @@ type CreateTopicsRequest struct { ValidateOnly bool } -// CreateTopicResponse represents a response from a kafka broker to a topic +// CreateTopicsResponse represents a response from a kafka broker to a topic // creation request. type CreateTopicsResponse struct { // The amount of time that the broker throttled the request. @@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsRequestV0 struct { +type createTopicsRequest struct { + v apiVersion // v0, v1, v2 + // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. Topics []createTopicsRequestV0Topic @@ -270,86 +272,136 @@ type createTopicsRequestV0 struct { // Timeout ms to wait for a topic to be completely created on the // controller node. Values <= 0 will trigger topic creation and return immediately Timeout int32 + + // If true, check that the topics can be created as specified, but don't create anything. + // Internal use only for Kafka 4.0 support. + ValidateOnly bool } -func (t createTopicsRequestV0) size() int32 { - return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + +func (t createTopicsRequest) size() int32 { + sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + sizeofInt32(t.Timeout) + if t.v >= v1 { + sz += 1 + } + return sz } -func (t createTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t createTopicsRequest) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) }) wb.writeInt32(t.Timeout) + if t.v >= v1 { + wb.writeBool(t.ValidateOnly) + } } -type createTopicsResponseV0TopicError struct { +type createTopicsResponseTopicError struct { + v apiVersion + // Topic name Topic string // ErrorCode holds response error code ErrorCode int16 + + // ErrorMessage holds response error message string + ErrorMessage string } -func (t createTopicsResponseV0TopicError) size() int32 { - return sizeofString(t.Topic) + +func (t createTopicsResponseTopicError) size() int32 { + sz := sizeofString(t.Topic) + sizeofInt16(t.ErrorCode) + if t.v >= v1 { + sz += sizeofString(t.ErrorMessage) + } + return sz } -func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) { +func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeInt16(t.ErrorCode) + if t.v >= v1 { + wb.writeString(t.ErrorMessage) + } } -func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } + if t.v >= v1 { + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } + } return } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsResponseV0 struct { - TopicErrors []createTopicsResponseV0TopicError +type createTopicsResponse struct { + v apiVersion + + ThrottleTime int32 // v2+ + TopicErrors []createTopicsResponseTopicError } -func (t createTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) +func (t createTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) + if t.v >= v2 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t createTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t createTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v2 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) } -func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var topic createTopicsResponseV0TopicError - if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil { + topic := createTopicsResponseTopicError{v: t.v} + if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil { return } t.TopicErrors = append(t.TopicErrors, topic) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + remain = size + if t.v >= v2 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) { - var response createTopicsResponseV0 +func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) { + version, err := c.negotiateVersion(createTopics, v0, v1, v2) + if err != nil { + return createTopicsResponse{}, err + } - err := c.writeOperation( + request.v = version + response := createTopicsResponse{v: version} + + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopics, v0, id, request) + return c.writeRequest(createTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -383,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { t.toCreateTopicsRequestV0Topic()) } - _, err := c.createTopics(createTopicsRequestV0{ + _, err := c.createTopics(createTopicsRequest{ Topics: requestV0Topics, }) return err diff --git a/vendor/github.com/segmentio/kafka-go/deletetopics.go b/vendor/github.com/segmentio/kafka-go/deletetopics.go index d758d9fd6..ff73d553b 100644 --- a/vendor/github.com/segmentio/kafka-go/deletetopics.go +++ b/vendor/github.com/segmentio/kafka-go/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV0 struct { +type deleteTopicsRequest struct { // Topics holds the topic names Topics []string @@ -77,41 +77,57 @@ type deleteTopicsRequestV0 struct { Timeout int32 } -func (t deleteTopicsRequestV0) size() int32 { +func (t deleteTopicsRequest) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequest) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } -type deleteTopicsResponseV0 struct { +type deleteTopicsResponse struct { + v apiVersion // v0, v1 + + ThrottleTime int32 // TopicErrorCodes holds per topic error codes TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode } -func (t deleteTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) +func (t deleteTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) + if t.v >= v1 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item deleteTopicsResponseV0TopicErrorCode - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.TopicErrorCodes = append(t.TopicErrorCodes, item) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + remain = size + if t.v >= v1 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v1 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) }) } @@ -146,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) { - var response deleteTopicsResponseV0 - err := c.writeOperation( +func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) { + version, err := c.negotiateVersion(deleteTopics, v0, v1) + if err != nil { + return deleteTopicsResponse{}, err + } + + response := deleteTopicsResponse{ + v: version, + } + + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopics, v0, id, request) + return c.writeRequest(deleteTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -164,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse }, ) if err != nil { - return deleteTopicsResponseV0{}, err + return deleteTopicsResponse{}, err } for _, c := range response.TopicErrorCodes { if c.ErrorCode != 0 { diff --git a/vendor/github.com/segmentio/kafka-go/describeclientquotas.go b/vendor/github.com/segmentio/kafka-go/describeclientquotas.go index 6291dcd98..bfe712f28 100644 --- a/vendor/github.com/segmentio/kafka-go/describeclientquotas.go +++ b/vendor/github.com/segmentio/kafka-go/describeclientquotas.go @@ -35,7 +35,7 @@ type DescribeClientQuotasRequestComponent struct { Match string } -// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request. +// DescribeClientQuotasResponse represents a response from a kafka broker to a describe client quota request. type DescribeClientQuotasResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration diff --git a/vendor/github.com/segmentio/kafka-go/docker-compose-241.yml b/vendor/github.com/segmentio/kafka-go/docker-compose-241.yml deleted file mode 100644 index 6feb1844b..000000000 --- a/vendor/github.com/segmentio/kafka-go/docker-compose-241.yml +++ /dev/null @@ -1,32 +0,0 @@ -version: "3" -services: - kafka: - image: wurstmeister/kafka:2.12-2.4.1 - restart: on-failure:3 - links: - - zookeeper - ports: - - 9092:9092 - - 9093:9093 - environment: - KAFKA_VERSION: '2.4.1' - KAFKA_BROKER_ID: '1' - KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' - KAFKA_DELETE_TOPIC_ENABLE: 'true' - KAFKA_ADVERTISED_HOST_NAME: 'localhost' - KAFKA_ADVERTISED_PORT: '9092' - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_MESSAGE_MAX_BYTES: '200000000' - KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' - KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' - KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" - CUSTOM_INIT_SCRIPT: |- - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; - /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram - - zookeeper: - image: wurstmeister/zookeeper - ports: - - 2181:2181 diff --git a/vendor/github.com/segmentio/kafka-go/docker-compose.010.yml b/vendor/github.com/segmentio/kafka-go/docker-compose.010.yml deleted file mode 100644 index 56123f85c..000000000 --- a/vendor/github.com/segmentio/kafka-go/docker-compose.010.yml +++ /dev/null @@ -1,29 +0,0 @@ -version: "3" -services: - kafka: - image: wurstmeister/kafka:0.10.1.1 - links: - - zookeeper - ports: - - 9092:9092 - - 9093:9093 - environment: - KAFKA_BROKER_ID: '1' - KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' - KAFKA_DELETE_TOPIC_ENABLE: 'true' - KAFKA_ADVERTISED_HOST_NAME: 'localhost' - KAFKA_ADVERTISED_PORT: '9092' - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_MESSAGE_MAX_BYTES: '200000000' - KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' - KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' - KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" - CUSTOM_INIT_SCRIPT: |- - echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; - - zookeeper: - image: wurstmeister/zookeeper - ports: - - 2181:2181 diff --git a/vendor/github.com/segmentio/kafka-go/docker-compose.yml b/vendor/github.com/segmentio/kafka-go/docker-compose.yml index dc0c2e85e..dffb0e448 100644 --- a/vendor/github.com/segmentio/kafka-go/docker-compose.yml +++ b/vendor/github.com/segmentio/kafka-go/docker-compose.yml @@ -1,34 +1,39 @@ -version: "3" +# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list. +version: '3' services: + zookeeper: + container_name: zookeeper + hostname: zookeeper + image: bitnami/zookeeper:latest + ports: + - 2181:2181 + environment: + ALLOW_ANONYMOUS_LOGIN: yes kafka: - image: wurstmeister/kafka:2.12-2.3.1 + container_name: kafka + image: bitnami/kafka:3.7.0 restart: on-failure:3 links: - - zookeeper + - zookeeper ports: - - 9092:9092 - - 9093:9093 + - 9092:9092 + - 9093:9093 environment: - KAFKA_VERSION: '2.3.1' - KAFKA_BROKER_ID: '1' - KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1' - KAFKA_DELETE_TOPIC_ENABLE: 'true' - KAFKA_ADVERTISED_HOST_NAME: 'localhost' - KAFKA_ADVERTISED_PORT: '9092' - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_MESSAGE_MAX_BYTES: '200000000' - KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' - KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' - KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer' - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' - KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" - CUSTOM_INIT_SCRIPT: |- - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; - /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram - - zookeeper: - image: wurstmeister/zookeeper - ports: - - 2181:2181 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_ADVERTISED_PORT: '9092' + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + ALLOW_PLAINTEXT_LISTENER: yes + entrypoint: + - "/bin/bash" + - "-c" + - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh diff --git a/vendor/github.com/segmentio/kafka-go/error.go b/vendor/github.com/segmentio/kafka-go/error.go index 4a7a8a278..300a1412f 100644 --- a/vendor/github.com/segmentio/kafka-go/error.go +++ b/vendor/github.com/segmentio/kafka-go/error.go @@ -329,6 +329,8 @@ func (e Error) Title() string { return "Unsupported Compression Type" case MemberIDRequired: return "Member ID Required" + case FencedInstanceID: + return "Fenced Instance ID" case EligibleLeadersNotAvailable: return "Eligible Leader Not Available" case ElectionNotNeeded: @@ -538,6 +540,8 @@ func (e Error) Description() string { return "the requesting client does not support the compression type of given partition" case MemberIDRequired: return "the group member needs to have a valid member id before actually entering a consumer group" + case FencedInstanceID: + return "the broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id" case EligibleLeadersNotAvailable: return "eligible topic partition leaders are not available" case ElectionNotNeeded: @@ -636,6 +640,7 @@ func coalesceErrors(errs ...error) error { return nil } +// MessageTooLargeError is returned when a message is too large to fit within the allowed size. type MessageTooLargeError struct { Message Message Remaining []Message @@ -655,6 +660,10 @@ func (e MessageTooLargeError) Error() string { return MessageSizeTooLarge.Error() } +func (e MessageTooLargeError) Unwrap() error { + return MessageSizeTooLarge +} + func makeError(code int16, message string) error { if code == 0 { return nil diff --git a/vendor/github.com/segmentio/kafka-go/joingroup.go b/vendor/github.com/segmentio/kafka-go/joingroup.go index 30823a69a..f3d90a937 100644 --- a/vendor/github.com/segmentio/kafka-go/joingroup.go +++ b/vendor/github.com/segmentio/kafka-go/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV1 struct { +type joinGroupRequest struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV1 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV1) size() int32 { +func (t joinGroupRequest) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { +func (t joinGroupRequest) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) @@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) }) } -type joinGroupResponseMemberV1 struct { +type joinGroupResponseMember struct { // MemberID assigned by the group coordinator MemberID string MemberMetadata []byte } -func (t joinGroupResponseMemberV1) size() int32 { +func (t joinGroupResponseMember) size() int32 { return sizeofString(t.MemberID) + sizeofBytes(t.MemberMetadata) } -func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponseMember) writeTo(wb *writeBuffer) { wb.writeString(t.MemberID) wb.writeBytes(t.MemberMetadata) } -func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.MemberID); err != nil { return } @@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain return } -type joinGroupResponseV1 struct { +type joinGroupResponse struct { + v apiVersion // v1, v2 + + ThrottleTime int32 + // ErrorCode holds response error code ErrorCode int16 @@ -323,19 +327,26 @@ type joinGroupResponseV1 struct { // MemberID assigned by the group coordinator MemberID string - Members []joinGroupResponseMemberV1 + Members []joinGroupResponseMember } -func (t joinGroupResponseV1) size() int32 { - return sizeofInt16(t.ErrorCode) + +func (t joinGroupResponse) size() int32 { + sz := sizeofInt16(t.ErrorCode) + sizeofInt32(t.GenerationID) + sizeofString(t.GroupProtocol) + sizeofString(t.LeaderID) + sizeofString(t.MemberID) + sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() }) + if t.v >= v2 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponse) writeTo(wb *writeBuffer) { + if t.v >= v2 { + wb.writeInt32(t.ThrottleTime) + } wb.writeInt16(t.ErrorCode) wb.writeInt32(t.GenerationID) wb.writeString(t.GroupProtocol) @@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) }) } -func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { +func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { + remain = size + if t.v >= v2 { + if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } if remain, err = readInt32(r, remain, &t.GenerationID); err != nil { @@ -362,7 +379,7 @@ func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, e } fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var item joinGroupResponseMemberV1 + var item joinGroupResponseMember if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } diff --git a/vendor/github.com/segmentio/kafka-go/listgroups.go b/vendor/github.com/segmentio/kafka-go/listgroups.go index 229de9352..5034b5440 100644 --- a/vendor/github.com/segmentio/kafka-go/listgroups.go +++ b/vendor/github.com/segmentio/kafka-go/listgroups.go @@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item listGroupsResponseGroupV1 - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.Groups = append(t.Groups, item) diff --git a/vendor/github.com/segmentio/kafka-go/listoffset.go b/vendor/github.com/segmentio/kafka-go/listoffset.go index 11c5d04b4..97779cecf 100644 --- a/vendor/github.com/segmentio/kafka-go/listoffset.go +++ b/vendor/github.com/segmentio/kafka-go/listoffset.go @@ -17,7 +17,7 @@ type OffsetRequest struct { } // FirstOffsetOf constructs an OffsetRequest which asks for the first offset of -// the parition given as argument. +// the partition given as argument. func FirstOffsetOf(partition int) OffsetRequest { return OffsetRequest{Partition: partition, Timestamp: FirstOffset} } diff --git a/vendor/github.com/segmentio/kafka-go/metadata.go b/vendor/github.com/segmentio/kafka-go/metadata.go index 429a6a260..d151071b3 100644 --- a/vendor/github.com/segmentio/kafka-go/metadata.go +++ b/vendor/github.com/segmentio/kafka-go/metadata.go @@ -19,7 +19,7 @@ type MetadataRequest struct { Topics []string } -// MetadatResponse represents a response from a kafka broker to a metadata +// MetadataResponse represents a response from a kafka broker to a metadata // request. type MetadataResponse struct { // The amount of time that the broker throttled the request. diff --git a/vendor/github.com/segmentio/kafka-go/offsetfetch.go b/vendor/github.com/segmentio/kafka-go/offsetfetch.go index b85bc5c83..ce80213f8 100644 --- a/vendor/github.com/segmentio/kafka-go/offsetfetch.go +++ b/vendor/github.com/segmentio/kafka-go/offsetfetch.go @@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { item := offsetFetchResponseV1PartitionResponse{} - if fnRemain, fnErr = (&item).readFrom(r, size); err != nil { + if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } t.PartitionResponses = append(t.PartitionResponses, item) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/record.go b/vendor/github.com/segmentio/kafka-go/protocol/record.go index e11af4dcc..c7987c390 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/record.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/record.go @@ -191,7 +191,7 @@ func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) { // Reconstruct the prefix that we had to read to determine the version // of the record set from the magic byte. // - // Technically this may recurisvely stack readers when consuming all + // Technically this may recursively stack readers when consuming all // items of the batch, which could hurt performance. In practice this // path should not be taken tho, since the decoder would read from a // *bufio.Reader which implements the bufferedReader interface. @@ -304,7 +304,7 @@ type RawRecordSet struct { // then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet. // // Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a -// performance standpoint as it require an extra copy of the record bytes. Holding off +// performance standpoint as it requires an extra copy of the record bytes. Holding off // on optimizing, as this code path is only invoked in tests. func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) { rs := &RecordSet{} diff --git a/vendor/github.com/segmentio/kafka-go/reader.go b/vendor/github.com/segmentio/kafka-go/reader.go index cfc7cb8f5..04d90f355 100644 --- a/vendor/github.com/segmentio/kafka-go/reader.go +++ b/vendor/github.com/segmentio/kafka-go/reader.go @@ -469,9 +469,11 @@ type ReaderConfig struct { JoinGroupBackoff time.Duration // RetentionTime optionally sets the length of time the consumer group will be saved - // by the broker + // by the broker. -1 will disable the setting and leave the + // retention up to the broker's offsets.retention.minutes property. By + // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= 2.0. // - // Default: 24h + // Default: -1 // // Only used when GroupID is set RetentionTime time.Duration diff --git a/vendor/github.com/segmentio/kafka-go/record.go b/vendor/github.com/segmentio/kafka-go/record.go index 1750889ac..8f8f7bd92 100644 --- a/vendor/github.com/segmentio/kafka-go/record.go +++ b/vendor/github.com/segmentio/kafka-go/record.go @@ -35,7 +35,7 @@ type Record = protocol.Record // RecordReader values are not safe to use concurrently from multiple goroutines. type RecordReader = protocol.RecordReader -// NewRecordReade reconstructs a RecordSet which exposes the sequence of records +// NewRecordReader reconstructs a RecordSet which exposes the sequence of records // passed as arguments. func NewRecordReader(records ...Record) RecordReader { return protocol.NewRecordReader(records...) diff --git a/vendor/github.com/segmentio/kafka-go/writer.go b/vendor/github.com/segmentio/kafka-go/writer.go index 3c7af907a..3817bf538 100644 --- a/vendor/github.com/segmentio/kafka-go/writer.go +++ b/vendor/github.com/segmentio/kafka-go/writer.go @@ -635,7 +635,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { } } - // We use int32 here to half the memory footprint (compared to using int + // We use int32 here to halve the memory footprint (compared to using int // on 64 bits architectures). We map lists of the message indexes instead // of the message values for the same reason, int32 is 4 bytes, vs a full // Message value which is 100+ bytes and contains pointers and contributes diff --git a/vendor/modules.txt b/vendor/modules.txt index e94dbc772..6e57adac9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1202,7 +1202,7 @@ github.com/open-policy-agent/opa/v1/version # github.com/opencloud-eu/libre-graph-api-go v1.0.5 ## explicit; go 1.18 github.com/opencloud-eu/libre-graph-api-go -# github.com/opencloud-eu/reva/v2 v2.32.1-0.20250515093940-2fb4f836b59d +# github.com/opencloud-eu/reva/v2 v2.33.0 ## explicit; go 1.24.1 github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace github.com/opencloud-eu/reva/v2/cmd/revad/runtime @@ -1713,7 +1713,7 @@ github.com/russellhaering/goxmldsig/types # github.com/russross/blackfriday/v2 v2.1.0 ## explicit github.com/russross/blackfriday/v2 -# github.com/segmentio/kafka-go v0.4.47 +# github.com/segmentio/kafka-go v0.4.48 ## explicit; go 1.15 github.com/segmentio/kafka-go github.com/segmentio/kafka-go/compress