anope

- supernets anope source code & configuration
git clone git://git.acid.vegas/anope.git
Log | Files | Refs | Archive | README

m_mysql.cpp (14964B)

      1 /*
      2  *
      3  * (C) 2010-2022 Anope Team
      4  * Contact us at team@anope.org
      5  *
      6  * Please read COPYING and README for further details.
      7  */
      8 
      9 /* RequiredLibraries: mysqlclient */
     10 /* RequiredWindowsLibraries: libmysql */
     11 
     12 #include "module.h"
     13 #include "modules/sql.h"
     14 #define NO_CLIENT_LONG_LONG
     15 #ifdef WIN32
     16 # include <mysql.h>
     17 #else
     18 # include <mysql/mysql.h>
     19 #endif
     20 
     21 using namespace SQL;
     22 
     23 /** Non blocking threaded MySQL API, based loosely from InspIRCd's m_mysql.cpp
     24  *
     25  * This module spawns a single thread that is used to execute blocking MySQL queries.
     26  * When a module requests a query to be executed it is added to a list for the thread
     27  * (which never stops looping and sleeping) to pick up and execute, the result of which
     28  * is inserted in to another queue to be picked up by the main thread. The main thread
     29  * uses Pipe to become notified through the socket engine when there are results waiting
     30  * to be sent back to the modules requesting the query
     31  */
     32 
     33 class MySQLService;
     34 
     35 /** A query request
     36  */
     37 struct QueryRequest
     38 {
     39 	/* The connection to the database */
     40 	MySQLService *service;
     41 	/* The interface to use once we have the result to send the data back */
     42 	Interface *sqlinterface;
     43 	/* The actual query */
     44 	Query query;
     45 
     46 	QueryRequest(MySQLService *s, Interface *i, const Query &q) : service(s), sqlinterface(i), query(q) { }
     47 };
     48 
     49 /** A query result */
     50 struct QueryResult
     51 {
     52 	/* The interface to send the data back on */
     53 	Interface *sqlinterface;
     54 	/* The result */
     55 	Result result;
     56 
     57 	QueryResult(Interface *i, Result &r) : sqlinterface(i), result(r) { }
     58 };
     59 
     60 /** A MySQL result
     61  */
     62 class MySQLResult : public Result
     63 {
     64 	MYSQL_RES *res;
     65 
     66  public:
     67 	MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r) : Result(i, q, fq), res(r)
     68 	{
     69 		unsigned num_fields = res ? mysql_num_fields(res) : 0;
     70 
     71 		/* It is not thread safe to log anything here using Log(this->owner) now :( */
     72 
     73 		if (!num_fields)
     74 			return;
     75 
     76 		for (MYSQL_ROW row; (row = mysql_fetch_row(res));)
     77 		{
     78 			MYSQL_FIELD *fields = mysql_fetch_fields(res);
     79 
     80 			if (fields)
     81 			{
     82 				std::map<Anope::string, Anope::string> items;
     83 
     84 				for (unsigned field_count = 0; field_count < num_fields; ++field_count)
     85 				{
     86 					Anope::string column = (fields[field_count].name ? fields[field_count].name : "");
     87 					Anope::string data = (row[field_count] ? row[field_count] : "");
     88 
     89 					items[column] = data;
     90 				}
     91 
     92 				this->entries.push_back(items);
     93 			}
     94 		}
     95 	}
     96 
     97 	MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err) : Result(0, q, fq, err), res(NULL)
     98 	{
     99 	}
    100 
    101 	~MySQLResult()
    102 	{
    103 		if (this->res)
    104 			mysql_free_result(this->res);
    105 	}
    106 };
    107 
    108 /** A MySQL connection, there can be multiple
    109  */
    110 class MySQLService : public Provider
    111 {
    112 	std::map<Anope::string, std::set<Anope::string> > active_schema;
    113 
    114 	Anope::string database;
    115 	Anope::string server;
    116 	Anope::string user;
    117 	Anope::string password;
    118 	int port;
    119 
    120 	MYSQL *sql;
    121 
    122 	/** Escape a query.
    123 	 * Note the mutex must be held!
    124 	 */
    125 	Anope::string Escape(const Anope::string &query);
    126 
    127  public:
    128 	/* Locked by the SQL thread when a query is pending on this database,
    129 	 * prevents us from deleting a connection while a query is executing
    130 	 * in the thread
    131 	 */
    132 	Mutex Lock;
    133 
    134 	MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po);
    135 
    136 	~MySQLService();
    137 
    138 	void Run(Interface *i, const Query &query) anope_override;
    139 
    140 	Result RunQuery(const Query &query) anope_override;
    141 
    142 	std::vector<Query> CreateTable(const Anope::string &table, const Data &data) anope_override;
    143 
    144 	Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) anope_override;
    145 
    146 	Query GetTables(const Anope::string &prefix) anope_override;
    147 
    148 	void Connect();
    149 
    150 	bool CheckConnection();
    151 
    152 	Anope::string BuildQuery(const Query &q);
    153 
    154 	Anope::string FromUnixtime(time_t);
    155 };
    156 
    157 /** The SQL thread used to execute queries
    158  */
    159 class DispatcherThread : public Thread, public Condition
    160 {
    161  public:
    162 	DispatcherThread() : Thread() { }
    163 
    164 	void Run() anope_override;
    165 };
    166 
    167 class ModuleSQL;
    168 static ModuleSQL *me;
    169 class ModuleSQL : public Module, public Pipe
    170 {
    171 	/* SQL connections */
    172 	std::map<Anope::string, MySQLService *> MySQLServices;
    173  public:
    174 	/* Pending query requests */
    175 	std::deque<QueryRequest> QueryRequests;
    176 	/* Pending finished requests with results */
    177 	std::deque<QueryResult> FinishedRequests;
    178 	/* The thread used to execute queries */
    179 	DispatcherThread *DThread;
    180 
    181 	ModuleSQL(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR)
    182 	{
    183 		me = this;
    184 
    185 
    186 		DThread = new DispatcherThread();
    187 		DThread->Start();
    188 	}
    189 
    190 	~ModuleSQL()
    191 	{
    192 		for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end(); ++it)
    193 			delete it->second;
    194 		MySQLServices.clear();
    195 
    196 		DThread->SetExitState();
    197 		DThread->Wakeup();
    198 		DThread->Join();
    199 		delete DThread;
    200 	}
    201 
    202 	void OnReload(Configuration::Conf *conf) anope_override
    203 	{
    204 		Configuration::Block *config = conf->GetModule(this);
    205 
    206 		for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end();)
    207 		{
    208 			const Anope::string &cname = it->first;
    209 			MySQLService *s = it->second;
    210 			int i;
    211 
    212 			++it;
    213 
    214 			for (i = 0; i < config->CountBlock("mysql"); ++i)
    215 				if (config->GetBlock("mysql", i)->Get<const Anope::string>("name", "mysql/main") == cname)
    216 					break;
    217 
    218 			if (i == config->CountBlock("mysql"))
    219 			{
    220 				Log(LOG_NORMAL, "mysql") << "MySQL: Removing server connection " << cname;
    221 
    222 				delete s;
    223 				this->MySQLServices.erase(cname);
    224 			}
    225 		}
    226 
    227 		for (int i = 0; i < config->CountBlock("mysql"); ++i)
    228 		{
    229 			Configuration::Block *block = config->GetBlock("mysql", i);
    230 			const Anope::string &connname = block->Get<const Anope::string>("name", "mysql/main");
    231 
    232 			if (this->MySQLServices.find(connname) == this->MySQLServices.end())
    233 			{
    234 				const Anope::string &database = block->Get<const Anope::string>("database", "anope");
    235 				const Anope::string &server = block->Get<const Anope::string>("server", "127.0.0.1");
    236 				const Anope::string &user = block->Get<const Anope::string>("username", "anope");
    237 				const Anope::string &password = block->Get<const Anope::string>("password");
    238 				int port = block->Get<int>("port", "3306");
    239 
    240 				try
    241 				{
    242 					MySQLService *ss = new MySQLService(this, connname, database, server, user, password, port);
    243 					this->MySQLServices.insert(std::make_pair(connname, ss));
    244 
    245 					Log(LOG_NORMAL, "mysql") << "MySQL: Successfully connected to server " << connname << " (" << server << ")";
    246 				}
    247 				catch (const SQL::Exception &ex)
    248 				{
    249 					Log(LOG_NORMAL, "mysql") << "MySQL: " << ex.GetReason();
    250 				}
    251 			}
    252 		}
    253 	}
    254 
    255 	void OnModuleUnload(User *, Module *m) anope_override
    256 	{
    257 		this->DThread->Lock();
    258 
    259 		for (unsigned i = this->QueryRequests.size(); i > 0; --i)
    260 		{
    261 			QueryRequest &r = this->QueryRequests[i - 1];
    262 
    263 			if (r.sqlinterface && r.sqlinterface->owner == m)
    264 			{
    265 				if (i == 1)
    266 				{
    267 					r.service->Lock.Lock();
    268 					r.service->Lock.Unlock();
    269 				}
    270 
    271 				this->QueryRequests.erase(this->QueryRequests.begin() + i - 1);
    272 			}
    273 		}
    274 
    275 		this->DThread->Unlock();
    276 
    277 		this->OnNotify();
    278 	}
    279 
    280 	void OnNotify() anope_override
    281 	{
    282 		this->DThread->Lock();
    283 		std::deque<QueryResult> finishedRequests = this->FinishedRequests;
    284 		this->FinishedRequests.clear();
    285 		this->DThread->Unlock();
    286 
    287 		for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it)
    288 		{
    289 			const QueryResult &qr = *it;
    290 
    291 			if (!qr.sqlinterface)
    292 				throw SQL::Exception("NULL qr.sqlinterface in MySQLPipe::OnNotify() ?");
    293 
    294 			if (qr.result.GetError().empty())
    295 				qr.sqlinterface->OnResult(qr.result);
    296 			else
    297 				qr.sqlinterface->OnError(qr.result);
    298 		}
    299 	}
    300 };
    301 
    302 MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po)
    303 : Provider(o, n), database(d), server(s), user(u), password(p), port(po), sql(NULL)
    304 {
    305 	Connect();
    306 }
    307 
    308 MySQLService::~MySQLService()
    309 {
    310 	me->DThread->Lock();
    311 	this->Lock.Lock();
    312 	mysql_close(this->sql);
    313 	this->sql = NULL;
    314 
    315 	for (unsigned i = me->QueryRequests.size(); i > 0; --i)
    316 	{
    317 		QueryRequest &r = me->QueryRequests[i - 1];
    318 
    319 		if (r.service == this)
    320 		{
    321 			if (r.sqlinterface)
    322 				r.sqlinterface->OnError(Result(0, r.query, "SQL Interface is going away"));
    323 			me->QueryRequests.erase(me->QueryRequests.begin() + i - 1);
    324 		}
    325 	}
    326 	this->Lock.Unlock();
    327 	me->DThread->Unlock();
    328 }
    329 
    330 void MySQLService::Run(Interface *i, const Query &query)
    331 {
    332 	me->DThread->Lock();
    333 	me->QueryRequests.push_back(QueryRequest(this, i, query));
    334 	me->DThread->Unlock();
    335 	me->DThread->Wakeup();
    336 }
    337 
    338 Result MySQLService::RunQuery(const Query &query)
    339 {
    340 	this->Lock.Lock();
    341 
    342 	Anope::string real_query = this->BuildQuery(query);
    343 
    344 	if (this->CheckConnection() && !mysql_real_query(this->sql, real_query.c_str(), real_query.length()))
    345 	{
    346 		MYSQL_RES *res = mysql_store_result(this->sql);
    347 		unsigned int id = mysql_insert_id(this->sql);
    348 
    349 		/* because we enabled CLIENT_MULTI_RESULTS in our options
    350 		 * a multiple statement or a procedure call can return
    351 		 * multiple result sets.
    352 		 * we must process them all before the next query.
    353 		 */
    354 
    355 		while (!mysql_next_result(this->sql))
    356 			mysql_free_result(mysql_store_result(this->sql));
    357 
    358 		this->Lock.Unlock();
    359 		return MySQLResult(id, query, real_query, res);
    360 	}
    361 	else
    362 	{
    363 		Anope::string error = mysql_error(this->sql);
    364 		this->Lock.Unlock();
    365 		return MySQLResult(query, real_query, error);
    366 	}
    367 }
    368 
    369 std::vector<Query> MySQLService::CreateTable(const Anope::string &table, const Data &data)
    370 {
    371 	std::vector<Query> queries;
    372 	std::set<Anope::string> &known_cols = this->active_schema[table];
    373 
    374 	if (known_cols.empty())
    375 	{
    376 		Log(LOG_DEBUG) << "m_mysql: Fetching columns for " << table;
    377 
    378 		Result columns = this->RunQuery("SHOW COLUMNS FROM `" + table + "`");
    379 		for (int i = 0; i < columns.Rows(); ++i)
    380 		{
    381 			const Anope::string &column = columns.Get(i, "Field");
    382 
    383 			Log(LOG_DEBUG) << "m_mysql: Column #" << i << " for " << table << ": " << column;
    384 			known_cols.insert(column);
    385 		}
    386 	}
    387 
    388 	if (known_cols.empty())
    389 	{
    390 		Anope::string query_text = "CREATE TABLE `" + table + "` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,"
    391 			" `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP";
    392 		for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    393 		{
    394 			known_cols.insert(it->first);
    395 
    396 			query_text += ", `" + it->first + "` ";
    397 			if (data.GetType(it->first) == Serialize::Data::DT_INT)
    398 				query_text += "int(11)";
    399 			else
    400 				query_text += "text";
    401 		}
    402 		query_text += ", PRIMARY KEY (`id`), KEY `timestamp_idx` (`timestamp`))";
    403 		queries.push_back(query_text);
    404 	}
    405 	else
    406 		for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    407 		{
    408 			if (known_cols.count(it->first) > 0)
    409 				continue;
    410 
    411 			known_cols.insert(it->first);
    412 
    413 			Anope::string query_text = "ALTER TABLE `" + table + "` ADD `" + it->first + "` ";
    414 			if (data.GetType(it->first) == Serialize::Data::DT_INT)
    415 				query_text += "int(11)";
    416 			else
    417 				query_text += "text";
    418 
    419 			queries.push_back(query_text);
    420 		}
    421 
    422 	return queries;
    423 }
    424 
    425 Query MySQLService::BuildInsert(const Anope::string &table, unsigned int id, Data &data)
    426 {
    427 	/* Empty columns not present in the data set */
    428 	const std::set<Anope::string> &known_cols = this->active_schema[table];
    429 	for (std::set<Anope::string>::iterator it = known_cols.begin(), it_end = known_cols.end(); it != it_end; ++it)
    430 		if (*it != "id" && *it != "timestamp" && data.data.count(*it) == 0)
    431 			data[*it] << "";
    432 
    433 	Anope::string query_text = "INSERT INTO `" + table + "` (`id`";
    434 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    435 		query_text += ",`" + it->first + "`";
    436 	query_text += ") VALUES (" + stringify(id);
    437 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    438 		query_text += ",@" + it->first + "@";
    439 	query_text += ") ON DUPLICATE KEY UPDATE ";
    440 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    441 		query_text += "`" + it->first + "`=VALUES(`" + it->first + "`),";
    442 	query_text.erase(query_text.end() - 1);
    443 
    444 	Query query(query_text);
    445 	for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    446 	{
    447 		Anope::string buf;
    448 		*it->second >> buf;
    449 
    450 		bool escape = true;
    451 		if (buf.empty())
    452 		{
    453 			buf = "NULL";
    454 			escape = false;
    455 		}
    456 
    457 		query.SetValue(it->first, buf, escape);
    458 	}
    459 
    460 	return query;
    461 }
    462 
    463 Query MySQLService::GetTables(const Anope::string &prefix)
    464 {
    465 	return Query("SHOW TABLES LIKE '" + prefix + "%';");
    466 }
    467 
    468 void MySQLService::Connect()
    469 {
    470 	this->sql = mysql_init(this->sql);
    471 
    472 	const unsigned int timeout = 1;
    473 	mysql_options(this->sql, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout));
    474 
    475 	bool connect = mysql_real_connect(this->sql, this->server.c_str(), this->user.c_str(), this->password.c_str(), this->database.c_str(), this->port, NULL, CLIENT_MULTI_RESULTS);
    476 
    477 	if (!connect)
    478 		throw SQL::Exception("Unable to connect to MySQL service " + this->name + ": " + mysql_error(this->sql));
    479 
    480 	Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->server << ":" << this->port;
    481 }
    482 
    483 
    484 bool MySQLService::CheckConnection()
    485 {
    486 	if (!this->sql || mysql_ping(this->sql))
    487 	{
    488 		try
    489 		{
    490 			this->Connect();
    491 		}
    492 		catch (const SQL::Exception &)
    493 		{
    494 			return false;
    495 		}
    496 	}
    497 
    498 	return true;
    499 }
    500 
    501 Anope::string MySQLService::Escape(const Anope::string &query)
    502 {
    503 	std::vector<char> buffer(query.length() * 2 + 1);
    504 	mysql_real_escape_string(this->sql, &buffer[0], query.c_str(), query.length());
    505 	return &buffer[0];
    506 }
    507 
    508 Anope::string MySQLService::BuildQuery(const Query &q)
    509 {
    510 	Anope::string real_query = q.query;
    511 
    512 	for (std::map<Anope::string, QueryData>::const_iterator it = q.parameters.begin(), it_end = q.parameters.end(); it != it_end; ++it)
    513 		real_query = real_query.replace_all_cs("@" + it->first + "@", (it->second.escape ? ("'" + this->Escape(it->second.data) + "'") : it->second.data));
    514 
    515 	return real_query;
    516 }
    517 
    518 Anope::string MySQLService::FromUnixtime(time_t t)
    519 {
    520 	return "FROM_UNIXTIME(" + stringify(t) + ")";
    521 }
    522 
    523 void DispatcherThread::Run()
    524 {
    525 	this->Lock();
    526 
    527 	while (!this->GetExitState())
    528 	{
    529 		if (!me->QueryRequests.empty())
    530 		{
    531 			QueryRequest &r = me->QueryRequests.front();
    532 			this->Unlock();
    533 
    534 			Result sresult = r.service->RunQuery(r.query);
    535 
    536 			this->Lock();
    537 			if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query)
    538 			{
    539 				if (r.sqlinterface)
    540 					me->FinishedRequests.push_back(QueryResult(r.sqlinterface, sresult));
    541 				me->QueryRequests.pop_front();
    542 			}
    543 		}
    544 		else
    545 		{
    546 			if (!me->FinishedRequests.empty())
    547 				me->Notify();
    548 			this->Wait();
    549 		}
    550 	}
    551 
    552 	this->Unlock();
    553 }
    554 
    555 MODULE_INIT(ModuleSQL)