Commit 0b8cc444 authored by Jean de Klerk's avatar Jean de Klerk
Browse files

spanner: add benchwrapper

Change-Id: I434182a0c7de081beb81a538e12a154fd0b5bec9
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/44290

Reviewed-by: default avatarEmmanuel Odeke <[email protected]>
parent 140f0fd3
# Benchwrapper
A small gRPC wrapper around the spanner client library. This allows the
benchmarking code to prod at spanner without speaking Go.
## Running
```
cd spanner/internal/benchwrapper
export SPANNER_EMULATOR_HOST=localhost:8080
go run *.go --port=8081
```
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package main wraps the client library in a gRPC interface that a benchmarker
// can communicate through.
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"cloud.google.com/go/spanner"
pb "cloud.google.com/go/spanner/internal/benchwrapper/proto"
"google.golang.org/api/iterator"
"google.golang.org/grpc"
)
var port = flag.String("port", "", "specify a port to run on")
func main() {
flag.Parse()
if *port == "" {
log.Fatalf("usage: %s --port=8081", os.Args[0])
}
if os.Getenv("SPANNER_EMULATOR_HOST") == "" {
log.Fatal("This benchmarking server only works when connected to an emulator. Please set SPANNER_EMULATOR_HOST.")
}
ctx := context.Background()
c, err := spanner.NewClient(ctx, "projects/someproject/instances/someinstance/databases/somedatabase")
if err != nil {
log.Fatal(err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *port))
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
pb.RegisterSpannerBenchWrapperServer(s, &server{
c: c,
})
log.Printf("Running on localhost:%s\n", *port)
log.Fatal(s.Serve(lis))
}
type server struct {
c *spanner.Client
}
func (s *server) Read(ctx context.Context, req *pb.ReadQuery) (*pb.EmptyResponse, error) {
it := s.c.ReadOnlyTransaction().Query(context.Background(), spanner.Statement{SQL: req.Query})
for {
_, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
// Do nothing with the data.
}
return &pb.EmptyResponse{}, nil
}
func (s *server) Insert(ctx context.Context, req *pb.InsertQuery) (*pb.EmptyResponse, error) {
var muts []*spanner.Mutation
for _, i := range req.Users {
muts = append(muts, spanner.Insert("sometable", []string{"name", "age"}, []interface{}{i.Name, i.Age}))
}
if _, err := s.c.Apply(context.Background(), muts); err != nil {
log.Fatal(err)
}
// Do nothing with the data.
return &pb.EmptyResponse{}, nil
}
func (s *server) Update(ctx context.Context, req *pb.UpdateQuery) (*pb.EmptyResponse, error) {
var stmts []spanner.Statement
for _, q := range req.Queries {
stmts = append(stmts, spanner.Statement{SQL: q})
}
if _, err := s.c.ReadWriteTransaction(context.Background(), func(ctx2 context.Context, tx *spanner.ReadWriteTransaction) error {
_, err := tx.BatchUpdate(ctx2, stmts)
return err
}); err != nil {
log.Fatal(err)
}
// Do nothing with the data.
return &pb.EmptyResponse{}, nil
}
# Regenerating protos
```
cd spanner/internal/benchwrapper/proto
protoc --go_out=plugins=grpc:. *.proto
```
\ No newline at end of file
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: spanner.proto
package spanner_bench
import (
context "context"
fmt "fmt"
math "math"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type ReadQuery struct {
// The query to use in the read call.
Query string `protobuf:"bytes,1,opt,name=Query,proto3" json:"Query,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReadQuery) Reset() { *m = ReadQuery{} }
func (m *ReadQuery) String() string { return proto.CompactTextString(m) }
func (*ReadQuery) ProtoMessage() {}
func (*ReadQuery) Descriptor() ([]byte, []int) {
return fileDescriptor_879d3e919e93c6ba, []int{0}
}
func (m *ReadQuery) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReadQuery.Unmarshal(m, b)
}
func (m *ReadQuery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReadQuery.Marshal(b, m, deterministic)
}
func (m *ReadQuery) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadQuery.Merge(m, src)
}
func (m *ReadQuery) XXX_Size() int {
return xxx_messageInfo_ReadQuery.Size(m)
}
func (m *ReadQuery) XXX_DiscardUnknown() {
xxx_messageInfo_ReadQuery.DiscardUnknown(m)
}
var xxx_messageInfo_ReadQuery proto.InternalMessageInfo
func (m *ReadQuery) GetQuery() string {
if m != nil {
return m.Query
}
return ""
}
type User struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Age int64 `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *User) Reset() { *m = User{} }
func (m *User) String() string { return proto.CompactTextString(m) }
func (*User) ProtoMessage() {}
func (*User) Descriptor() ([]byte, []int) {
return fileDescriptor_879d3e919e93c6ba, []int{1}
}
func (m *User) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_User.Unmarshal(m, b)
}
func (m *User) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_User.Marshal(b, m, deterministic)
}
func (m *User) XXX_Merge(src proto.Message) {
xxx_messageInfo_User.Merge(m, src)
}
func (m *User) XXX_Size() int {
return xxx_messageInfo_User.Size(m)
}
func (m *User) XXX_DiscardUnknown() {
xxx_messageInfo_User.DiscardUnknown(m)
}
var xxx_messageInfo_User proto.InternalMessageInfo
func (m *User) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *User) GetAge() int64 {
if m != nil {
return m.Age
}
return 0
}
type InsertQuery struct {
// The query to use in the insert call.
Users []*User `protobuf:"bytes,1,rep,name=users,proto3" json:"users,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *InsertQuery) Reset() { *m = InsertQuery{} }
func (m *InsertQuery) String() string { return proto.CompactTextString(m) }
func (*InsertQuery) ProtoMessage() {}
func (*InsertQuery) Descriptor() ([]byte, []int) {
return fileDescriptor_879d3e919e93c6ba, []int{2}
}
func (m *InsertQuery) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_InsertQuery.Unmarshal(m, b)
}
func (m *InsertQuery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_InsertQuery.Marshal(b, m, deterministic)
}
func (m *InsertQuery) XXX_Merge(src proto.Message) {
xxx_messageInfo_InsertQuery.Merge(m, src)
}
func (m *InsertQuery) XXX_Size() int {
return xxx_messageInfo_InsertQuery.Size(m)
}
func (m *InsertQuery) XXX_DiscardUnknown() {
xxx_messageInfo_InsertQuery.DiscardUnknown(m)
}
var xxx_messageInfo_InsertQuery proto.InternalMessageInfo
func (m *InsertQuery) GetUsers() []*User {
if m != nil {
return m.Users
}
return nil
}
type UpdateQuery struct {
// The queries to use in the update call.
Queries []string `protobuf:"bytes,1,rep,name=Queries,proto3" json:"Queries,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *UpdateQuery) Reset() { *m = UpdateQuery{} }
func (m *UpdateQuery) String() string { return proto.CompactTextString(m) }
func (*UpdateQuery) ProtoMessage() {}
func (*UpdateQuery) Descriptor() ([]byte, []int) {
return fileDescriptor_879d3e919e93c6ba, []int{3}
}
func (m *UpdateQuery) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_UpdateQuery.Unmarshal(m, b)
}
func (m *UpdateQuery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_UpdateQuery.Marshal(b, m, deterministic)
}
func (m *UpdateQuery) XXX_Merge(src proto.Message) {
xxx_messageInfo_UpdateQuery.Merge(m, src)
}
func (m *UpdateQuery) XXX_Size() int {
return xxx_messageInfo_UpdateQuery.Size(m)
}
func (m *UpdateQuery) XXX_DiscardUnknown() {
xxx_messageInfo_UpdateQuery.DiscardUnknown(m)
}
var xxx_messageInfo_UpdateQuery proto.InternalMessageInfo
func (m *UpdateQuery) GetQueries() []string {
if m != nil {
return m.Queries
}
return nil
}
type EmptyResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *EmptyResponse) Reset() { *m = EmptyResponse{} }
func (m *EmptyResponse) String() string { return proto.CompactTextString(m) }
func (*EmptyResponse) ProtoMessage() {}
func (*EmptyResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_879d3e919e93c6ba, []int{4}
}
func (m *EmptyResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_EmptyResponse.Unmarshal(m, b)
}
func (m *EmptyResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_EmptyResponse.Marshal(b, m, deterministic)
}
func (m *EmptyResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_EmptyResponse.Merge(m, src)
}
func (m *EmptyResponse) XXX_Size() int {
return xxx_messageInfo_EmptyResponse.Size(m)
}
func (m *EmptyResponse) XXX_DiscardUnknown() {
xxx_messageInfo_EmptyResponse.DiscardUnknown(m)
}
var xxx_messageInfo_EmptyResponse proto.InternalMessageInfo
func init() {
proto.RegisterType((*ReadQuery)(nil), "spanner_bench.ReadQuery")
proto.RegisterType((*User)(nil), "spanner_bench.User")
proto.RegisterType((*InsertQuery)(nil), "spanner_bench.InsertQuery")
proto.RegisterType((*UpdateQuery)(nil), "spanner_bench.UpdateQuery")
proto.RegisterType((*EmptyResponse)(nil), "spanner_bench.EmptyResponse")
}
func init() { proto.RegisterFile("spanner.proto", fileDescriptor_879d3e919e93c6ba) }
var fileDescriptor_879d3e919e93c6ba = []byte{
// 254 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xb1, 0x4e, 0xc3, 0x30,
0x10, 0x86, 0x31, 0x49, 0x8b, 0x72, 0x51, 0x05, 0xba, 0x32, 0x58, 0x15, 0x43, 0xf0, 0x42, 0x90,
0x50, 0x86, 0xb2, 0x30, 0x22, 0x04, 0x03, 0x23, 0x46, 0x15, 0x23, 0x72, 0xe9, 0x09, 0x18, 0xea,
0x58, 0xb6, 0x3b, 0xf4, 0x79, 0x79, 0x11, 0xe4, 0x1c, 0x45, 0x6d, 0x18, 0xe8, 0xf6, 0xff, 0xf2,
0xdd, 0x77, 0xfa, 0x64, 0x18, 0x05, 0x67, 0xac, 0x25, 0xdf, 0x38, 0xdf, 0xc6, 0x16, 0x37, 0xf5,
0x75, 0x4e, 0xf6, 0xed, 0x43, 0x9d, 0x43, 0xa1, 0xc9, 0x2c, 0x9e, 0x56, 0xe4, 0xd7, 0x78, 0x0a,
0x83, 0x2e, 0x48, 0x51, 0x89, 0xba, 0xd0, 0x5c, 0xd4, 0x15, 0xe4, 0xb3, 0x40, 0x1e, 0x11, 0x72,
0x6b, 0x96, 0xf4, 0xf3, 0xd8, 0x65, 0x3c, 0x81, 0xcc, 0xbc, 0x93, 0x3c, 0xac, 0x44, 0x9d, 0xe9,
0x14, 0xd5, 0x0d, 0x94, 0x8f, 0x36, 0x90, 0x8f, 0x8c, 0xbc, 0x84, 0xc1, 0x2a, 0x90, 0x0f, 0x52,
0x54, 0x59, 0x5d, 0x4e, 0xc7, 0xcd, 0xce, 0xf9, 0x26, 0x81, 0x35, 0x4f, 0xa8, 0x0b, 0x28, 0x67,
0x6e, 0x61, 0x22, 0xf1, 0xa6, 0x84, 0xa3, 0x14, 0x3e, 0x89, 0x77, 0x0b, 0xbd, 0xa9, 0xea, 0x18,
0x46, 0x0f, 0x4b, 0x17, 0xd7, 0x9a, 0x82, 0x6b, 0x6d, 0xa0, 0xe9, 0x97, 0x80, 0xf1, 0x33, 0x73,
0xef, 0x12, 0xf6, 0xc5, 0x1b, 0xe7, 0xc8, 0xe3, 0x2d, 0xe4, 0x49, 0x0e, 0x65, 0xef, 0xea, 0xaf,
0xf1, 0xe4, 0xac, 0xf7, 0xb2, 0xc3, 0x55, 0x07, 0x78, 0x0f, 0x43, 0xb6, 0xc1, 0x49, 0x6f, 0x72,
0x4b, 0x72, 0x1f, 0x0a, 0x9b, 0xfd, 0xa1, 0x6c, 0x09, 0xff, 0x47, 0x99, 0x0f, 0xbb, 0x0f, 0xbc,
0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x1e, 0x2e, 0xa6, 0x11, 0xd1, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// SpannerBenchWrapperClient is the client API for SpannerBenchWrapper service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type SpannerBenchWrapperClient interface {
// Read represents operations like Go's ReadOnlyTransaction.Query, Java's
// ReadOnlyTransaction.executeQuery, Python's snapshot.read, and Node's
// Transaction.Read.
//
// It will typically be used to read many items.
Read(ctx context.Context, in *ReadQuery, opts ...grpc.CallOption) (*EmptyResponse, error)
// Insert represents operations like Go's Client.Apply, Java's
// DatabaseClient.writeAtLeastOnce, Python's transaction.commit, and Node's
// Transaction.Commit.
//
// It will typically be used to insert many items.
Insert(ctx context.Context, in *InsertQuery, opts ...grpc.CallOption) (*EmptyResponse, error)
// Update represents operations like Go's ReadWriteTransaction.BatchUpdate,
// Java's TransactionRunner.run, Python's Batch.update, and Node's
// Transaction.BatchUpdate.
//
// It will typically be used to update many items.
Update(ctx context.Context, in *UpdateQuery, opts ...grpc.CallOption) (*EmptyResponse, error)
}
type spannerBenchWrapperClient struct {
cc *grpc.ClientConn
}
func NewSpannerBenchWrapperClient(cc *grpc.ClientConn) SpannerBenchWrapperClient {
return &spannerBenchWrapperClient{cc}
}
func (c *spannerBenchWrapperClient) Read(ctx context.Context, in *ReadQuery, opts ...grpc.CallOption) (*EmptyResponse, error) {
out := new(EmptyResponse)
err := c.cc.Invoke(ctx, "/spanner_bench.SpannerBenchWrapper/Read", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *spannerBenchWrapperClient) Insert(ctx context.Context, in *InsertQuery, opts ...grpc.CallOption) (*EmptyResponse, error) {
out := new(EmptyResponse)
err := c.cc.Invoke(ctx, "/spanner_bench.SpannerBenchWrapper/Insert", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *spannerBenchWrapperClient) Update(ctx context.Context, in *UpdateQuery, opts ...grpc.CallOption) (*EmptyResponse, error) {
out := new(EmptyResponse)
err := c.cc.Invoke(ctx, "/spanner_bench.SpannerBenchWrapper/Update", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// SpannerBenchWrapperServer is the server API for SpannerBenchWrapper service.
type SpannerBenchWrapperServer interface {
// Read represents operations like Go's ReadOnlyTransaction.Query, Java's
// ReadOnlyTransaction.executeQuery, Python's snapshot.read, and Node's
// Transaction.Read.
//
// It will typically be used to read many items.
Read(context.Context, *ReadQuery) (*EmptyResponse, error)
// Insert represents operations like Go's Client.Apply, Java's
// DatabaseClient.writeAtLeastOnce, Python's transaction.commit, and Node's
// Transaction.Commit.
//
// It will typically be used to insert many items.
Insert(context.Context, *InsertQuery) (*EmptyResponse, error)
// Update represents operations like Go's ReadWriteTransaction.BatchUpdate,
// Java's TransactionRunner.run, Python's Batch.update, and Node's
// Transaction.BatchUpdate.
//
// It will typically be used to update many items.
Update(context.Context, *UpdateQuery) (*EmptyResponse, error)
}
// UnimplementedSpannerBenchWrapperServer can be embedded to have forward compatible implementations.
type UnimplementedSpannerBenchWrapperServer struct {
}
func (*UnimplementedSpannerBenchWrapperServer) Read(ctx context.Context, req *ReadQuery) (*EmptyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
}
func (*UnimplementedSpannerBenchWrapperServer) Insert(ctx context.Context, req *InsertQuery) (*EmptyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Insert not implemented")
}
func (*UnimplementedSpannerBenchWrapperServer) Update(ctx context.Context, req *UpdateQuery) (*EmptyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Update not implemented")
}
func RegisterSpannerBenchWrapperServer(s *grpc.Server, srv SpannerBenchWrapperServer) {
s.RegisterService(&_SpannerBenchWrapper_serviceDesc, srv)
}
func _SpannerBenchWrapper_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadQuery)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SpannerBenchWrapperServer).Read(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/spanner_bench.SpannerBenchWrapper/Read",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SpannerBenchWrapperServer).Read(ctx, req.(*ReadQuery))
}
return interceptor(ctx, in, info, handler)
}
func _SpannerBenchWrapper_Insert_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InsertQuery)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SpannerBenchWrapperServer).Insert(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/spanner_bench.SpannerBenchWrapper/Insert",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SpannerBenchWrapperServer).Insert(ctx, req.(*InsertQuery))
}
return interceptor(ctx, in, info, handler)
}
func _SpannerBenchWrapper_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateQuery)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SpannerBenchWrapperServer).Update(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/spanner_bench.SpannerBenchWrapper/Update",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SpannerBenchWrapperServer).Update(ctx, req.(*UpdateQuery))
}
return interceptor(ctx, in, info, handler)
}
var _SpannerBenchWrapper_serviceDesc = grpc.ServiceDesc{
ServiceName: "spanner_bench.SpannerBenchWrapper",
HandlerType: (*SpannerBenchWrapperServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Read",
Handler: _SpannerBenchWrapper_Read_Handler,
},
{
MethodName: "Insert",
Handler: _SpannerBenchWrapper_Insert_Handler,
},
{
MethodName: "Update",
Handler: _SpannerBenchWrapper_Update_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "spanner.proto",
}
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package spanner_bench;
message ReadQuery{
// The query to use in the read call.
string Query = 1;