mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-05-08 04:20:59 -05:00
127 lines
3.6 KiB
Go
127 lines
3.6 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/describeclientquotas"
|
|
)
|
|
|
|
// DescribeClientQuotasRequest represents a request sent to a kafka broker to
|
|
// describe client quotas.
|
|
type DescribeClientQuotasRequest struct {
|
|
// Address of the kafka broker to send the request to
|
|
Addr net.Addr
|
|
|
|
// List of quota components to describe.
|
|
Components []DescribeClientQuotasRequestComponent
|
|
|
|
// Whether the match is strict, i.e. should exclude entities with
|
|
// unspecified entity types.
|
|
Strict bool
|
|
}
|
|
|
|
type DescribeClientQuotasRequestComponent struct {
|
|
// The entity type that the filter component applies to.
|
|
EntityType string
|
|
|
|
// How to match the entity (0 = exact name, 1 = default name,
|
|
// 2 = any specified name).
|
|
MatchType int8
|
|
|
|
// The string to match against, or null if unused for the match type.
|
|
Match string
|
|
}
|
|
|
|
// DescribeClientQuotasResponse represents a response from a kafka broker to a describe client quota request.
|
|
type DescribeClientQuotasResponse struct {
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// Error is set to a non-nil value including the code and message if a top-level
|
|
// error was encountered when doing the update.
|
|
Error error
|
|
|
|
// List of describe client quota responses.
|
|
Entries []DescribeClientQuotasResponseQuotas
|
|
}
|
|
|
|
type DescribeClientQuotasEntity struct {
|
|
// The quota entity type.
|
|
EntityType string
|
|
|
|
// The name of the quota entity, or null if the default.
|
|
EntityName string
|
|
}
|
|
|
|
type DescribeClientQuotasValue struct {
|
|
// The quota configuration key.
|
|
Key string
|
|
|
|
// The quota configuration value.
|
|
Value float64
|
|
}
|
|
|
|
type DescribeClientQuotasResponseQuotas struct {
|
|
// List of client quota entities and their descriptions.
|
|
Entities []DescribeClientQuotasEntity
|
|
|
|
// The client quota configuration values.
|
|
Values []DescribeClientQuotasValue
|
|
}
|
|
|
|
// DescribeClientQuotas sends a describe client quotas request to a kafka broker and returns
|
|
// the response.
|
|
func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) {
|
|
components := make([]describeclientquotas.Component, len(req.Components))
|
|
|
|
for componentIdx, component := range req.Components {
|
|
components[componentIdx] = describeclientquotas.Component{
|
|
EntityType: component.EntityType,
|
|
MatchType: component.MatchType,
|
|
Match: component.Match,
|
|
}
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &describeclientquotas.Request{
|
|
Components: components,
|
|
Strict: req.Strict,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).DescribeClientQuotas: %w", err)
|
|
}
|
|
|
|
res := m.(*describeclientquotas.Response)
|
|
responseEntries := make([]DescribeClientQuotasResponseQuotas, len(res.Entries))
|
|
|
|
for responseEntryIdx, responseEntry := range res.Entries {
|
|
responseEntities := make([]DescribeClientQuotasEntity, len(responseEntry.Entities))
|
|
for responseEntityIdx, responseEntity := range responseEntry.Entities {
|
|
responseEntities[responseEntityIdx] = DescribeClientQuotasEntity{
|
|
EntityType: responseEntity.EntityType,
|
|
EntityName: responseEntity.EntityName,
|
|
}
|
|
}
|
|
|
|
responseValues := make([]DescribeClientQuotasValue, len(responseEntry.Values))
|
|
for responseValueIdx, responseValue := range responseEntry.Values {
|
|
responseValues[responseValueIdx] = DescribeClientQuotasValue{
|
|
Key: responseValue.Key,
|
|
Value: responseValue.Value,
|
|
}
|
|
}
|
|
responseEntries[responseEntryIdx] = DescribeClientQuotasResponseQuotas{
|
|
Entities: responseEntities,
|
|
Values: responseValues,
|
|
}
|
|
}
|
|
ret := &DescribeClientQuotasResponse{
|
|
Throttle: time.Duration(res.ThrottleTimeMs),
|
|
Entries: responseEntries,
|
|
}
|
|
|
|
return ret, nil
|
|
}
|