forked from oceanbase/oceanbase
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathob_sql_task.h
More file actions
143 lines (126 loc) · 3 KB
/
ob_sql_task.h
File metadata and controls
143 lines (126 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SQL_OB_SQL_TASK_
#define OCEANBASE_SQL_OB_SQL_TASK_
#include "rpc/frame/ob_req_processor.h"
#include "observer/ob_srv_task.h"
namespace oceanbase {
namespace sql {
class ObSql;
class ObSqlTaskHandler : public rpc::frame::ObReqProcessor {
public:
ObSqlTaskHandler() : task_(NULL), sql_engine_(NULL)
{}
~ObSqlTaskHandler()
{}
int init(observer::ObSrvTask* task, ObSql* sql_engine);
void reset();
protected:
int deserialize()
{
return common::OB_SUCCESS;
}
int serialize()
{
return common::OB_SUCCESS;
}
int response(const int retcode)
{
UNUSED(retcode);
return common::OB_SUCCESS;
}
int process();
int after_process()
{
req_has_wokenup_ = true;
return common::OB_SUCCESS;
}
private:
DISALLOW_COPY_AND_ASSIGN(ObSqlTaskHandler);
observer::ObSrvTask* task_;
ObSql* sql_engine_;
};
class ObSqlTask : public observer::ObSrvTask {
friend class ObSqlTaskFactory;
public:
ObSqlTask() : msg_type_(0), size_(0), is_fixed_alloc_(false), handler_()
{}
~ObSqlTask()
{}
int init(const int msg_type, const ObReqTimestamp& req_ts, const char* buf, const int64_t size, ObSql* sql_engine);
void reset();
int get_msg_type() const
{
return msg_type_;
}
const char* get_buf() const
{
return buf_;
}
int64_t get_size() const
{
return size_;
}
rpc::frame::ObReqProcessor& get_processor()
{
return handler_;
}
const ObReqTimestamp& get_req_ts() const
{
return req_ts_;
}
TO_STRING_KV(KP(this), K_(msg_type));
public:
static const int64_t MAX_SQL_TASK_SIZE = 16 * 1024 - 128;
private:
void set_fixed_alloc()
{
is_fixed_alloc_ = true;
}
bool is_fixed_alloc() const
{
return is_fixed_alloc_;
}
private:
int msg_type_;
char buf_[MAX_SQL_TASK_SIZE];
int64_t size_;
bool is_fixed_alloc_;
ObSqlTaskHandler handler_;
ObReqTimestamp req_ts_;
};
class ObSqlTaskFactory {
public:
ObSqlTaskFactory()
{}
~ObSqlTaskFactory()
{
destroy();
}
int init();
void destroy();
public:
ObSqlTask* alloc(const uint64_t tenant_id);
void free(ObSqlTask* task);
static ObSqlTaskFactory& get_instance();
private:
ObSqlTask* alloc_(const uint64_t tenant_id);
void free_(ObSqlTask* task);
private:
static const int64_t NORMAL_FIXED_TASK_NUM = 4096;
static const int64_t MINI_FIXED_TASK_NUM = 128;
private:
ObFixedQueue<ObSqlTask> fixed_queue_;
};
} // namespace sql
} // namespace oceanbase
#endif // OCEANBASE_SQL_OB_SQL_TASK_