Commit 5bc02cbc authored by Alex Hong's avatar Alex Hong
Browse files

pubsub: fix Ack/ModAck overhead calculation

Initially, we were using a constant 100 bytes to
support overhead from the subscription name
and encoding without calculating it.

This change fixes that by properly including the
overhead of serializing the subscription field
when calling `splitRequestIDs`.

Fixes #1441

Change-Id: I71755f6b3179adf27729b219261c1d8cda4d4a6c
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/44450

Reviewed-by: default avatarEmmanuel Odeke <[email protected]>
Reviewed-by: default avatarDavid Symonds <[email protected]>
Reviewed-by: default avatarJean de Klerk <[email protected]>
parent 6f3ae29f
......@@ -22,6 +22,7 @@ import (
vkit "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/internal/distribution"
"github.com/golang/protobuf/proto"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
......@@ -375,7 +376,9 @@ func (it *messageIterator) handleKeepAlives() {
}
func (it *messageIterator) sendAck(m map[string]bool) bool {
return it.sendAckIDRPC(m, func(ids []string) error {
// Account for the Subscription field.
overhead := calcFieldSizeString(it.subName)
return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error {
recordStat(it.ctx, AckCount, int64(len(ids)))
addAcks(ids)
// Use context.Background() as the call's context, not it.ctx. We don't
......@@ -392,13 +395,16 @@ func (it *messageIterator) sendAck(m map[string]bool) bool {
// percentile in order to capture the highest amount of time necessary without
// considering 1% outliers.
func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration) bool {
return it.sendAckIDRPC(m, func(ids []string) error {
deadlineSec := int32(deadline / time.Second)
// Account for the Subscription and AckDeadlineSeconds fields.
overhead := calcFieldSizeString(it.subName) + calcFieldSizeInt(int(deadlineSec))
return it.sendAckIDRPC(m, maxPayload-overhead, func(ids []string) error {
if deadline == 0 {
recordStat(it.ctx, NackCount, int64(len(ids)))
} else {
recordStat(it.ctx, ModAckCount, int64(len(ids)))
}
addModAcks(ids, int32(deadline/time.Second))
addModAcks(ids, deadlineSec)
// Retry this RPC on Unavailable for a short amount of time, then give up
// without returning a fatal error. The utility of this RPC is by nature
// transient (since the deadline is relative to the current time) and it
......@@ -414,7 +420,7 @@ func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration)
for {
err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: int32(deadline / time.Second),
AckDeadlineSeconds: deadlineSec,
AckIds: ids,
})
switch status.Code(err) {
......@@ -436,14 +442,14 @@ func (it *messageIterator) sendModAck(m map[string]bool, deadline time.Duration)
})
}
func (it *messageIterator) sendAckIDRPC(ackIDSet map[string]bool, call func([]string) error) bool {
func (it *messageIterator) sendAckIDRPC(ackIDSet map[string]bool, maxSize int, call func([]string) error) bool {
ackIDs := make([]string, 0, len(ackIDSet))
for k := range ackIDSet {
ackIDs = append(ackIDs, k)
}
var toSend []string
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, len(it.subName), maxPayload)
toSend, ackIDs = splitRequestIDs(ackIDs, maxSize)
if err := call(toSend); err != nil {
// The underlying client handles retries, so any error is fatal to the
// iterator.
......@@ -465,11 +471,35 @@ func (it *messageIterator) pingStream() {
_ = it.ps.Send(&pb.StreamingPullRequest{})
}
func splitRequestIDs(ids []string, overHeadLength, maxSize int) (prefix, remainder []string) {
size := overHeadLength
// calcFieldSizeString returns the number of bytes string fields
// will take up in an encoded proto message.
func calcFieldSizeString(fields ...string) int {
overhead := 0
for _, field := range fields {
overhead += 1 + len(field) + proto.SizeVarint(uint64(len(field)))
}
return overhead
}
// calcFieldSizeInt returns the number of bytes int fields
// will take up in an encoded proto message.
func calcFieldSizeInt(fields ...int) int {
overhead := 0
for _, field := range fields {
overhead += 1 + proto.SizeVarint(uint64(field))
}
return overhead
}
// splitRequestIDs takes a slice of ackIDs and returns two slices such that the first
// ackID slice can be used in a request where the payload does not exceed maxSize.
func splitRequestIDs(ids []string, maxSize int) (prefix, remainder []string) {
size := 0
i := 0
// TODO(hongalex): Use binary search to find split index, since ackIDs are
// fairly constant.
for size < maxSize && i < len(ids) {
size += overheadPerID + len(ids[i])
size += calcFieldSizeString(ids[i])
i++
}
if size > maxSize {
......
......@@ -15,6 +15,7 @@
package pubsub
import (
"bytes"
"context"
"errors"
"fmt"
......@@ -27,6 +28,7 @@ import (
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/option"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
......@@ -41,7 +43,6 @@ var (
func TestSplitRequestIDs(t *testing.T) {
t.Parallel()
ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
subName := "s"
for _, test := range []struct {
ids []string
splitIndex int
......@@ -50,7 +51,7 @@ func TestSplitRequestIDs(t *testing.T) {
{ids, 2},
{ids[:2], 2},
} {
got1, got2 := splitRequestIDs(test.ids, len(subName), 15)
got1, got2 := splitRequestIDs(test.ids, 15)
want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
if !testutil.Equal(got1, want1) {
t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
......@@ -61,6 +62,51 @@ func TestSplitRequestIDs(t *testing.T) {
}
}
func TestCalcFieldSize(t *testing.T) {
t.Parallel()
// Create a mock ack request to test.
req := &pb.AcknowledgeRequest{
Subscription: "sub",
AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"},
}
size := calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
// Proto encoding is calculated from 1 tag byte and 1 size byte for each string.
want := (1 + 1) + len(req.Subscription) + // subscription field: 1 tag byte + 1 size byte
5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
if size != want {
t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
}
req.Subscription = string(bytes.Repeat([]byte{'A'}, 300))
size = calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
// With a longer subscription name, we use an extra size byte.
want = (1 + 2) + len(req.Subscription) + // subscription field: 1 tag byte + 2 size bytes
5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
if size != want {
t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
}
// Create a mock modack request to test.
modAckReq := &pb.ModifyAckDeadlineRequest{
Subscription: "sub",
AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"},
AckDeadlineSeconds: 300,
}
size = calcFieldSizeString(modAckReq.Subscription) +
calcFieldSizeString(modAckReq.AckIds...) +
calcFieldSizeInt(int(modAckReq.AckDeadlineSeconds))
want = (1 + 1) + len(modAckReq.Subscription) + // subscription field: 1 tag byte + 1 size byte
5*(1+1+3) + // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
(1 + 2) // ackDeadline: 1 tag byte + 2 size bytes
if size != want {
t.Errorf("pubsub: calculated modAck req size of %d bytes, want %d", size, want)
}
}
func TestAckDistribution(t *testing.T) {
if testing.Short() {
t.SkipNow()
......
......@@ -26,20 +26,14 @@ import (
"google.golang.org/grpc/status"
)
// maxPayload is the maximum number of bytes to devote to actual ids in
// acknowledgement or modifyAckDeadline requests. A serialized
// AcknowledgeRequest proto has a small constant overhead, plus the size of the
// subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A
// ModifyAckDeadlineRequest has an additional few bytes for the deadline. We
// don't know the subscription name here, so we just assume the size exclusive
// of ids is 100 bytes.
// maxPayload is the maximum number of bytes to devote to the
// encoded AcknowledgementRequest / ModifyAckDeadline proto message.
//
// With gRPC there is no way for the client to know the server's max message size (it is
// configurable on the server). We know from experience that it
// it 512K.
const (
maxPayload = 512 * 1024
overheadPerID = 3 //3 bytes per ID (a tag byte and two size bytes)
maxSendRecvBytes = 20 * 1024 * 1024 // 20M
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment