Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .testdata/tarantool_bootstrap_3301.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env tarantool
-- Details: https:www.tarantool.io/en/doc/latest/reference/configuration/
box.cfg{
work_dir = '/tmp/tarantool_data/3301',
memtx_dir = '/tmp/tarantool_data/3301/memtx',
wal_dir = '/tmp/tarantool_data/3301/wal',
memtx_memory = 1048576000, -- 100Mb
sql_cache_size = 104857600, -- 100Mb
memtx_use_mvcc_engine = true,
pid_file = "tarantool3301.pid",
worker_pool_threads = 16,
iproto_threads = 8,
listen = 3301,
checkpoint_interval = 0,
replication_synchro_quorum = 3,
replication_synchro_timeout = 10,
election_mode = 'candidate',
replication = {
'replicator:password@localhost:3301',
'replicator:password@localhost:3302',
'replicator:password@localhost:3303'
},
read_only = false,
}

box.once("schema", function()
box.schema.user.create('replicator', {password = 'password'})
box.schema.user.grant('replicator', 'replication')

box.schema.user.passwd('pass')

box.execute([[CREATE TABLE IF NOT EXISTS test_table (
id VARCHAR(40) primary key ,
name VARCHAR(100) NOT NULL,
type int NOT NULL
);]])

box.execute([[CREATE UNIQUE INDEX IF NOT EXISTS t_idx_1 ON test_table (name, type);]])

box.space.TEST_TABLE:alter({is_sync = true})

print('box.once executed')
end)

function get_cluster_members()
a = {}
a["uri"] = 'localhost:3301'
a["type"] = 'writable'
b = {}
b["uri"] = 'localhost:3302'
b["type"] = 'non-writable'
c = {}
c["uri"] = 'localhost:3303'
c["type"] = 'non-writable'

return { a, b, c }
end
56 changes: 56 additions & 0 deletions .testdata/tarantool_bootstrap_3302.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env tarantool
-- Details: https:www.tarantool.io/en/doc/latest/reference/configuration/
box.cfg{
work_dir = '/tmp/tarantool_data/3302',
memtx_dir = '/tmp/tarantool_data/3302/memtx',
wal_dir = '/tmp/tarantool_data/3302/wal',
memtx_memory = 1048576000, -- 100Mb
sql_cache_size = 104857600, -- 100Mb
memtx_use_mvcc_engine = true,
pid_file = "tarantool3302.pid",
worker_pool_threads = 16,
iproto_threads = 8,
listen = 3302,
checkpoint_interval = 0,
replication_synchro_quorum = 3,
replication_synchro_timeout = 10,
election_mode = 'voter',
replication = {
'replicator:password@localhost:3301',
'replicator:password@localhost:3302',
'replicator:password@localhost:3303'
},
read_only = false,
}

box.once("schema", function()
box.schema.user.create('replicator', {password = 'password'})
box.schema.user.grant('replicator', 'replication')

box.schema.user.passwd('pass')

box.execute([[CREATE TABLE IF NOT EXISTS test_table (
id VARCHAR(40) primary key ,
name VARCHAR(100) NOT NULL,
type int NOT NULL
);]])

box.execute([[CREATE UNIQUE INDEX IF NOT EXISTS t_idx_1 ON test_table (name, type);]])
box.space.TEST_TABLE:alter({is_sync = true})

print('box.once executed')
end)

function get_cluster_members()
a = {}
a["uri"] = 'localhost:3301'
a["type"] = 'writable'
b = {}
b["uri"] = 'localhost:3302'
b["type"] = 'non-writable'
c = {}
c["uri"] = 'localhost:3303'
c["type"] = 'non-writable'

return { a, b, c }
end
56 changes: 56 additions & 0 deletions .testdata/tarantool_bootstrap_3303.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env tarantool
-- Details: https:www.tarantool.io/en/doc/latest/reference/configuration/
box.cfg{
work_dir = '/tmp/tarantool_data/3303',
memtx_dir = '/tmp/tarantool_data/3303/memtx',
wal_dir = '/tmp/tarantool_data/3303/wal',
memtx_memory = 1048576000, -- 100Mb
sql_cache_size = 104857600, -- 100Mb
memtx_use_mvcc_engine = true,
pid_file = "tarantool3303.pid",
worker_pool_threads = 16,
iproto_threads = 8,
listen = 3303,
checkpoint_interval = 0,
replication_synchro_quorum = 3,
replication_synchro_timeout = 10,
election_mode = 'voter',
replication = {
'replicator:password@localhost:3301',
'replicator:password@localhost:3302',
'replicator:password@localhost:3303'
},
read_only = false,
}

box.once("schema", function()
box.schema.user.create('replicator', {password = 'password'})
box.schema.user.grant('replicator', 'replication')

box.schema.user.passwd('pass')

box.execute([[CREATE TABLE IF NOT EXISTS test_table (
id VARCHAR(40) primary key ,
name VARCHAR(100) NOT NULL,
type int NOT NULL
);]])

box.execute([[CREATE UNIQUE INDEX IF NOT EXISTS t_idx_1 ON test_table (name, type);]])
box.space.TEST_TABLE:alter({is_sync = true})

print('box.once executed')
end)

function get_cluster_members()
a = {}
a["uri"] = 'localhost:3301'
a["type"] = 'writable'
b = {}
b["uri"] = 'localhost:3302'
b["type"] = 'non-writable'
c = {}
c["uri"] = 'localhost:3303'
c["type"] = 'non-writable'

return { a, b, c }
end
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ BENCH_REFERENCE_REPO := ${BENCH_PATH}/go-tarantool
BENCH_OPTIONS := -bench=. -run=^Benchmark -benchmem -benchtime=${DURATION} -count=${COUNT}
GO_TARANTOOL_URL := https://github.com/tarantool/go-tarantool
GO_TARANTOOL_DIR := ${PROJECT_DIR}/${BENCH_PATH}/go-tarantool
TAGS :=
TAGS := go_tarantool_msgpack_v5,go_tarantool_ssl_disable
TTCTL := tt
ifeq (,$(shell which tt 2>/dev/null))
TTCTL := tarantoolctl
Expand Down
3 changes: 2 additions & 1 deletion auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
. "github.com/tarantool/go-tarantool"

. "github.com/ice-blockchain/go-tarantool"
)

func TestAuth_String(t *testing.T) {
Expand Down
98 changes: 98 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package balancer

import (
"fmt"
"strings"
"unicode"

"github.com/ice-blockchain/go-tarantool"
)

type clusterMemberType string

const (
Writable = clusterMemberType("writable")
NonWritable = clusterMemberType("non-writable")
)

func CheckIfRequiresWriteForRequest(req tarantool.Request, _default bool) bool {
switch req.(type) {
case *tarantool.PrepareRequest:
case *tarantool.UnprepareRequest:
case *tarantool.ExecutePreparedRequest:
return _default // How can we get access to privateField to verify with CheckIfRequiresWrite?
case *tarantool.IdRequest:
return _default
case *tarantool.PingRequest:
return _default
case *tarantool.SelectRequest:
return false
case *tarantool.InsertRequest:
return true
case *tarantool.ReplaceRequest:
return true
case *tarantool.DeleteRequest:
return true
case *tarantool.UpdateRequest:
return true
case *tarantool.UpsertRequest:
return true
case *tarantool.CallRequest:
case *tarantool.EvalRequest:
case *tarantool.ExecuteRequest:
case *tarantool.BeginRequest:
return true
case *tarantool.CommitRequest:
return true
case *tarantool.RollbackRequest:
return true
default:
return _default
}
return _default
}

func CheckIfRequiresWrite(expr string, _default bool) (string, bool) {
trimmedS := strings.ToLower(strings.TrimLeftFunc(expr, func(r rune) bool {
return unicode.IsSpace(r) || unicode.IsControl(r)
}))
nonWritableTemplate := fmt.Sprintf("{{%s}}", NonWritable)
if isNonWritable := strings.HasPrefix(trimmedS, nonWritableTemplate); isNonWritable {
return strings.Replace(expr, nonWritableTemplate, "", 1), false
}

writableTemplate := fmt.Sprintf("{{%s}}", Writable)
if isWritable := strings.HasPrefix(trimmedS, writableTemplate); isWritable {
return strings.Replace(expr, writableTemplate, "", 1), true
}

if isWritable := strings.HasPrefix(trimmedS, "insert"); isWritable {
return expr, true
}

if isWritable := strings.HasPrefix(trimmedS, "delete"); isWritable {
return expr, true
}

if isWritable := strings.HasPrefix(trimmedS, "update"); isWritable {
return expr, true
}

if isWritable := strings.HasPrefix(trimmedS, "replace"); isWritable {
return expr, true
}

if isNonWritable := strings.HasPrefix(trimmedS, "values"); isNonWritable {
return expr, false
}

if isNonWritable := strings.HasPrefix(trimmedS, "with"); isNonWritable {
return expr, false
}

if isNonWritable := strings.HasPrefix(trimmedS, "select"); isNonWritable {
return expr, false
}

return expr, _default
}
5 changes: 3 additions & 2 deletions box_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"testing"

"github.com/stretchr/testify/require"
. "github.com/tarantool/go-tarantool"
"github.com/tarantool/go-tarantool/test_helpers"

. "github.com/ice-blockchain/go-tarantool"
"github.com/ice-blockchain/go-tarantool/test_helpers"
)

var samples = map[string]BoxError{
Expand Down
4 changes: 2 additions & 2 deletions call_16_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package tarantool_test
import (
"testing"

. "github.com/tarantool/go-tarantool"
"github.com/tarantool/go-tarantool/test_helpers"
. "github.com/ice-blockchain/go-tarantool"
"github.com/ice-blockchain/go-tarantool/test_helpers"
)

func TestConnection_Call(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions call_17_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package tarantool_test
import (
"testing"

. "github.com/tarantool/go-tarantool"
"github.com/tarantool/go-tarantool/test_helpers"
. "github.com/ice-blockchain/go-tarantool"
"github.com/ice-blockchain/go-tarantool/test_helpers"
)

func TestConnection_Call(t *testing.T) {
Expand Down
54 changes: 54 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"log"
"math"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -172,6 +173,9 @@ type Connection struct {
shutdownWatcher Watcher
// requestCnt is a counter of active requests.
requestCnt int64

sqlPreparedStatementCache sync.Map //map[string]*Prepared
cacheMx sync.Mutex
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -1252,6 +1256,56 @@ func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
return NewPreparedFromResponse(conn, resp)
}

func (conn *Connection) prepareExecute(f func(future *Future) (*Response, error), expr string, args map[string]interface{}) (*Response, error) {
if preparedStatement, prepareErr := conn.sqlPreparedStatement(expr); prepareErr != nil {
return nil, prepareErr
} else {
executeReq := NewExecutePreparedRequest(preparedStatement).Args(args)
fut := conn.Do(executeReq)
r, err := f(fut)
if err != nil {
if ter, ok := err.(Error); ok &&
(ter.Code == ER_WRONG_QUERY_ID || (ter.Code == ER_SQL_EXECUTE && strings.Contains(ter.Msg, "statement has expired"))) {
conn.cacheMx.Lock()
conn.sqlPreparedStatementCache.Delete(expr)
conn.cacheMx.Unlock()
return conn.prepareExecute(f, expr, args)
}
}
return r, err
}
}

func (conn *Connection) PrepareExecute(sql string, args map[string]interface{}) (resp *Response, err error) {
return conn.prepareExecute(func(fut *Future) (*Response, error) { return fut.Get() }, sql, args)
}

func (conn *Connection) PrepareExecuteTyped(sql string, args map[string]interface{}, result interface{}) (err error) {
_, err = conn.prepareExecute(func(fut *Future) (*Response, error) { return nil, fut.GetTyped(result) }, sql, args)
return
}

func (conn *Connection) PrepareExecuteAsync(sql string, args map[string]interface{}) (resp *Future) {
_, _ = conn.prepareExecute(func(fut *Future) (*Response, error) { resp = fut; return nil, nil }, sql, args)
return
}
func (conn *Connection) sqlPreparedStatement(sql string) (prepared *Prepared, err error) {
if preparedIf, ok := conn.sqlPreparedStatementCache.Load(sql); ok {
prepared = preparedIf.(*Prepared)
} else {
conn.cacheMx.Lock()
defer conn.cacheMx.Unlock()
if preparedIf, ok = conn.sqlPreparedStatementCache.Load(sql); ok {
prepared = preparedIf.(*Prepared)
} else {
if prepared, err = conn.NewPrepared(sql); err == nil {
conn.sqlPreparedStatementCache.Store(sql, prepared)
}
}
}
return
}

// NewStream creates new Stream object for connection.
//
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
Expand Down
Loading