Commit 9e18ddb0 authored by Jean de Klerk's avatar Jean de Klerk
Browse files

pubsub: add ordered keys

Pub/Sub ordered keys requires:

- Publish with a non-empty key gets added to a per-key queue, as implemented
by Bundler.HandlerLimit=1. This setting causes the bundler to only have one
outstanding bundle being handled at a time; further bundles form a queue.
- Receive with a message that has a non-empty key gets added to a per-key
queue, as implemented by a slice. When a worker operates on a key, it
processes all items in the key's slice until the slice is empty, at which point
it deletes the key to release the key's resources, and the worker returns to
the worker pool.

Functionally, for users, this should involve:

- Slightly slower processing speeds due to additional data structures
along the Publish and Receive paths. (we should do some performance
testing to vet this out)
- PublishSettings.NumGoroutines and ReceiveSettings.NumGoroutines should
both result in double the number of goroutines, since an additional data
structure (publish/receive scheduler) uses that value. Documentation has
been adjusted accordingly to describe it more as a scaling factor than
an exact number.
- Default ReceiveSettings.NumGoroutines is increased from 1 to 10. It's
generally a better experience to have multiple workers than one. And,
some tests - like TestStreamingPullFlowControl - implicitly require it.
- Since ordered keys require only a single outstanding RPC at once, it is
possible to send ordered key messages to Topic.Publish (and subsequently to
PublishScheduler.Add) faster than the bundler can publish them to the
Pub/Sub service, resulting in a backed up queue of Pub/Sub bundles. Each
item in the bundler queue is a goroutine, which means users may see thousands
of goroutines in debugging they might do.

TODO: add Resume method.

Change-Id: Ib710944d557970290cf12321d1fdbd9b1699231d
parent f013ab08
......@@ -15,10 +15,13 @@
package pubsub
import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"strings"
"sync"
"testing"
"time"
......@@ -28,6 +31,7 @@ import (
"cloud.google.com/go/internal/uid"
"cloud.google.com/go/internal/version"
kms "cloud.google.com/go/kms/apiv1"
testutil2 "cloud.google.com/go/pubsub/internal/testutil"
"github.com/golang/protobuf/proto"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
......@@ -1081,3 +1085,191 @@ func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) {
t.Fatalf("\ngot: - want: +\n%s", diff)
}
}
func TestIntegration_OrderedKeys_Basic(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
ctx := context.Background()
client := integrationTestClient(ctx, t)
defer client.Close()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatal(err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
var sub *Subscription
if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
}); err != nil {
t.Fatal(err)
}
_ = sub
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
}
topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true
orderingKey := "some-ordering-key"
numItems := 1000
for i := 0; i < numItems; i++ {
r := topic.Publish(ctx, &Message{
ID: fmt.Sprintf("id-%d", i),
Data: []byte(fmt.Sprintf("item-%d", i)),
OrderingKey: orderingKey,
})
go func() {
<-r.ready
if r.err != nil {
t.Error(r.err)
}
}()
}
received := make(chan string, numItems)
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}
received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}
}()
for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Fatalf("%d: got %s, want %s", i, got, want)
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out after 5s waiting for item %d", i)
}
}
}
func TestIntegration_OrderedKeys_JSON(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
ctx := context.Background()
client := integrationTestClient(ctx, t)
defer client.Close()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatal(err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
var sub *Subscription
if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
}); err != nil {
t.Fatal(err)
}
_ = sub
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
}
topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true
inFile, err := os.Open("testdata/publish.csv")
if err != nil {
t.Fatal(err)
}
defer inFile.Close()
mu := sync.Mutex{}
var publishData []testutil2.OrderedKeyMsg
var receiveData []testutil2.OrderedKeyMsg
wg := sync.WaitGroup{}
scanner := bufio.NewScanner(inFile)
for scanner.Scan() {
line := scanner.Text()
// TODO: use strings.ReplaceAll once we only support 1.11+.
line = strings.Replace(line, "\"", "", -1)
parts := strings.Split(line, ",")
key := parts[0]
msg := parts[1]
publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
topic.Publish(ctx, &Message{
ID: msg,
Data: []byte(msg),
OrderingKey: key,
})
wg.Add(1)
}
if err := scanner.Err(); err != nil {
t.Fatal(err)
}
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
mu.Lock()
receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)})
mu.Unlock()
wg.Done()
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}
}()
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(15 * time.Second):
t.Fatal("timed out after 15s waiting for all messages to be received")
}
mu.Lock()
defer mu.Unlock()
if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
t.Fatal(err)
}
}
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"errors"
"reflect"
"sync"
"time"
"google.golang.org/api/support/bundler"
)
// PublishScheduler is a scheduler which is designed for Pub/Sub's Publish flow.
// It bundles items before handling them. All items in this PublishScheduler use
// the same handler.
//
// Each item is added with a given key. Items added to the empty string key are
// handled in random order. Items added to any other key are handled
// sequentially.
type PublishScheduler struct {
// Settings passed down to each bundler that gets created.
DelayThreshold time.Duration
BundleCountThreshold int
BundleByteThreshold int
BundleByteLimit int
BufferedByteLimit int
mu sync.Mutex
bundlers map[string]*bundler.Bundler
outstanding map[string]int
// workers is a channel that represents workers. Rather than a pool, where
// worker are "removed" until the pool is empty, the channel is more like a
// set of work desks, where workers are "added" until all the desks are full.
//
// workers does not restrict the amount of goroutines in the bundlers.
// Rather, it acts as the flow control for completion of bundler work.
workers chan struct{}
handle func(bundle interface{})
done chan struct{}
}
// NewPublishScheduler returns a new PublishScheduler.
//
// The workers arg is the number of workers that will operate on the queue of
// work. A reasonably large number of workers is highly recommended. If the
// workers arg is 0, then a healthy default of 10 workers is used.
//
// The scheduler does not use a parent context. If it did, canceling that
// context would immediately stop the scheduler without waiting for
// undelivered messages.
//
// The scheduler should be stopped only with FlushAndStop.
func NewPublishScheduler(workers int, handle func(bundle interface{})) *PublishScheduler {
if workers == 0 {
workers = 10
}
s := PublishScheduler{
bundlers: make(map[string]*bundler.Bundler),
outstanding: make(map[string]int),
workers: make(chan struct{}, workers),
handle: handle,
done: make(chan struct{}),
}
return &s
}
// Add adds an item to the scheduler at a given key.
//
// Add never blocks. Buffering happens in the scheduler's publishers. There is
// no flow control.
//
// Since ordered keys require only a single outstanding RPC at once, it is
// possible to send ordered key messages to Topic.Publish (and subsequently to
// PublishScheduler.Add) faster than the bundler can publish them to the
// Pub/Sub service, resulting in a backed up queue of Pub/Sub bundles. Each
// item in the bundler queue is a goroutine.
func (s *PublishScheduler) Add(key string, item interface{}, size int) error {
select {
case <-s.done:
return errors.New("draining")
default:
}
s.mu.Lock()
defer s.mu.Unlock()
b, ok := s.bundlers[key]
if !ok {
s.outstanding[key] = 1
b = bundler.NewBundler(item, func(bundle interface{}) {
s.workers <- struct{}{}
s.handle(bundle)
<-s.workers
nlen := reflect.ValueOf(bundle).Len()
s.mu.Lock()
s.outstanding[key] -= nlen
if s.outstanding[key] == 0 {
delete(s.outstanding, key)
delete(s.bundlers, key)
}
s.mu.Unlock()
})
b.DelayThreshold = s.DelayThreshold
b.BundleCountThreshold = s.BundleCountThreshold
b.BundleByteThreshold = s.BundleByteThreshold
b.BundleByteLimit = s.BundleByteLimit
b.BufferedByteLimit = s.BufferedByteLimit
if b.BufferedByteLimit == 0 {
b.BufferedByteLimit = 1e9
}
if key == "" {
// There's no way to express "unlimited" in the bundler, so we use
// some high number.
b.HandlerLimit = 1e9
} else {
// HandlerLimit=1 causes the bundler to act as a sequential queue.
b.HandlerLimit = 1
}
s.bundlers[key] = b
}
s.outstanding[key]++
return b.Add(item, size)
}
// FlushAndStop begins flushing items from bundlers and from the scheduler. It
// blocks until all items have been flushed.
func (s *PublishScheduler) FlushAndStop() {
close(s.done)
for _, b := range s.bundlers {
b.Flush()
}
}
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler_test
import (
"fmt"
"reflect"
"testing"
"cloud.google.com/go/pubsub/internal/scheduler"
)
const pubSchedulerWorkers = 100
func BenchmarkPublisher_Unkeyed(b *testing.B) {
wait := make(chan struct{}, b.N)
ps := scheduler.NewPublishScheduler(pubSchedulerWorkers, func(bundle interface{}) {
nlen := reflect.ValueOf(bundle).Len()
for i := 0; i < nlen; i++ {
wait <- struct{}{}
}
})
go func() {
for i := 0; i < b.N; i++ {
if err := ps.Add("", fmt.Sprintf("item_%d", i), 1); err != nil {
b.Error(err)
}
}
}()
for j := 0; j < b.N; j++ {
<-wait
}
}
func BenchmarkPublisher_SingleKey(b *testing.B) {
wait := make(chan struct{}, b.N)
ps := scheduler.NewPublishScheduler(pubSchedulerWorkers, func(bundle interface{}) {
nlen := reflect.ValueOf(bundle).Len()
for i := 0; i < nlen; i++ {
wait <- struct{}{}
}
})
go func() {
for i := 0; i < b.N; i++ {
if err := ps.Add("some-key", fmt.Sprintf("item_%d", i), 1); err != nil {
b.Error(err)
}
}
}()
for j := 0; j < b.N; j++ {
<-wait
}
}
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler_test
import (
"fmt"
"testing"
"time"
"cloud.google.com/go/pubsub/internal/scheduler"
)
func TestPublishScheduler_Put_Basic(t *testing.T) {
done := make(chan struct{})
defer close(done)
keysHandled := map[string]chan int{}
handle := func(itemi interface{}) {
items := itemi.([]pair)
for _, item := range items {
keysHandled[item.k] <- item.v
}
}
s := scheduler.NewPublishScheduler(2, handle)
defer s.FlushAndStop()
// If these values are too high, the race detector will fail with
// "race: limit on 8128 simultaneously alive goroutines is exceeded, dying".
numItems := 100
numKeys := 10
for ki := 0; ki < numKeys; ki++ {
k := fmt.Sprintf("some_key_%d", ki)
keysHandled[k] = make(chan int, numItems)
}
for ki := 0; ki < numKeys; ki++ {
k := fmt.Sprintf("some_key_%d", ki)
go func() {
for i := 0; i < numItems; i++ {
select {
case <-done:
return
default:
}
if err := s.Add(k, pair{k, i}, 1); err != nil {
t.Error(err)
}
}
}()
}
for ki := 0; ki < numKeys; ki++ {
k := fmt.Sprintf("some_key_%d", ki)
for want := 0; want < numItems; want++ {
select {
case got := <-keysHandled[k]:
if got != want {
t.Fatalf("%s: got %d, want %d", k, got, want)
}
case <-time.After(5 * time.Second):
t.Fatalf("%s: expected key %s - item %d to be handled but never was", k, k, want)
}
}
}
}
// Scheduler schedules many items of one key in order even when there are
// many workers.
func TestPublishScheduler_Put_ManyWithOneKey(t *testing.T) {
done := make(chan struct{})
defer close(done)
recvd := make(chan int)
handle := func(itemi interface{}) {
items := itemi.([]int)
for _, item := range items {
recvd <- item
}
}
s := scheduler.NewPublishScheduler(10, handle)
defer s.FlushAndStop()
// If these values are too high, the race detector will fail with
// "race: limit on 8128 simultaneously alive goroutines is exceeded, dying".
numItems := 1000
go func() {
for i := 0; i < numItems; i++ {
select {
case <-done:
return
default:
}
if err := s.Add("some-key", i, 1); err != nil {
t.Error(err)
}
}
}()
for want := 0; want < numItems; want++ {
select {
case got := <-recvd:
if got != want {
t.Fatalf("got %d, want %d", got, want)
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for item %d to be handled", want)
}
}
}
func TestPublishScheduler_DoesntRaceWithPublisher(t *testing.T) {
done := make(chan struct{})
defer close(done)
keysHandled := map[string]chan int{}
handle := func(itemi interface{}) {
items := itemi.([]pair)
for _, item := range items {
keysHandled[item.k] <- item.v
}
}
s := scheduler.NewPublishScheduler(2, handle)
defer s.FlushAndStop()
// If these values are too high, the race detector will fail with
// "race: limit on 8128 simultaneously alive goroutines is exceeded, dying".
numItems := 100
numKeys := 10
for ki := 0; ki < numKeys; ki++ {
k := fmt.Sprintf("some_key_%d", ki)
keysHandled[k] = make(chan int, numItems)
}
for ki := 0; ki < numKeys; ki++ {
k := fmt.Sprintf("some_key_%d", ki)
go func() {
for i := 0; i < numItems; i++ {
select {
case <-done:
return
default:
}
if err := s.Add(k, pair{k, i}, 1); err != nil {
t.Error(err)
}
}
}()
}
for ki := 0; ki < numKeys; ki++ {
k := fmt.Sprintf("some_key_%d", ki)
for want := 0; want < numItems; want++ {
select {
case got := <-keysHandled[k]:
if got != want {