diff --git a/plugins/sql.c b/plugins/sql.c index 24c3b28c5a40..cd0a52200d9f 100644 --- a/plugins/sql.c +++ b/plugins/sql.c @@ -2,6 +2,7 @@ #include "config.h" #include #include +#include #include #include #include @@ -24,8 +25,6 @@ static const char schemas[] = ; /* TODO: - * 2. Refresh time in API. - * 8. time_msec fields. * 10. General pagination API (not just chainmoves and channelmoves) * 11. Normalize account_id fields into another table, as they are highly duplicate, and use views to maintain the current API. */ @@ -93,8 +92,6 @@ struct db_query { struct table_desc **tables; const char *authfail; bool has_wildcard; - /* Update *last_created_index */ - u64 *last_created_index; }; /* Waiting for another command to refresh table */ @@ -104,6 +101,20 @@ struct refresh_waiter { struct db_query *dbq; }; +enum refresh_needs { + /* Naive tables always need refresh */ + REFRESH_ALWAYS = 0x8, + + /* Up-to-date! */ + REFRESH_UNNECESSARY = 0, + /* We were notified of new created entries */ + REFRESH_CREATED = 0x1, + /* We were notified of new updated entries */ + REFRESH_UPDATED = 0x2, + /* We were notified of new deleted entries */ + REFRESH_DELETED = 0x4, +}; + struct table_desc { /* e.g. listpeers. For sub-tables, the raw name without * parent prepended */ @@ -115,23 +126,26 @@ struct table_desc { /* name if we need to wait for changes */ const char *waitname; struct column **columns; - char *update_stmt; + const char *insert_stmt; + /* If we have create_index, we can delete by it */ + const char *delete_stmt; /* If we are a subtable */ struct table_desc *parent; /* Is this a sub object (otherwise, subarray if parent is true) */ bool is_subobject; /* Do we use created_index as primary key? Otherwise we create rowid. */ bool has_created_index; - /* Have we created our sql indexes yet? */ - bool indices_created; + /* Have we ever been used? */ + bool populated; /* function to refresh it. */ struct command_result *(*refresh)(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); /* some refresh functions maintain changed and created indexes */ u64 last_created_index; + u64 last_updated_index; /* Do we need a refresh? */ - bool needs_refresh; + enum refresh_needs refresh_needs; /* Are we refreshing now? */ bool refreshing; /* When did we start refreshing? */ @@ -148,6 +162,9 @@ struct sql { int gosstore_fd ; size_t gosstore_nodes_off, gosstore_channels_off; u64 next_rowid; + + /* This is an aux_command for all our watches */ + struct command *waitcmd; }; static struct sql *sql_of(struct plugin *plugin) @@ -547,16 +564,7 @@ static struct command_result *next_refresh(struct command *cmd, return refresh_tables(cmd, dbq); } -static struct command_result *wait_done(struct command *cmd, - const char *method, - const char *buf, - const jsmntok_t *result, - struct table_desc *td) -{ - td->needs_refresh = true; - return aux_command_done(cmd); -} - +/* Recursion */ static struct command_result *one_refresh_done(struct command *cmd, struct db_query *dbq, bool was_limited) @@ -580,9 +588,11 @@ static struct command_result *one_refresh_done(struct command *cmd, (u64)refresh_duration.ts.tv_nsec, td->last_created_index); - if (!td->indices_created) { + if (!td->populated) { + /* Now we've done initial population, install indices: + * much faster than creating them before! */ init_indices(cmd->plugin, td); - td->indices_created = 1; + td->populated = true; refresh_duration = timemono_since(td->refresh_start); plugin_log(cmd->plugin, LOG_DBG, "Time to refresh + create indices for %s: %"PRIu64".%09"PRIu64" seconds", @@ -591,18 +601,6 @@ static struct command_result *one_refresh_done(struct command *cmd, (u64)refresh_duration.ts.tv_nsec); } - /* We put in a wait command to we get told when we need to refresh */ - if (td->waitname) { - struct out_req *req; - td->needs_refresh = false; - req = jsonrpc_request_start(aux_command(cmd), "wait", wait_done, - plugin_broken_cb, td); - json_add_string(req->js, "subsystem", td->waitname); - json_add_string(req->js, "indexname", "created"); - json_add_u64(req->js, "nextvalue", td->last_created_index+1); - send_outreq(req); - } - /* Transfer refresh waiters onto local list */ list_head_init(&waiters); list_append_list(&waiters, &td->refresh_waiters); @@ -626,7 +624,9 @@ static struct command_result *process_json_list(struct command *cmd, const jsmntok_t *arr, const u64 *rowid, const struct table_desc *td, - u64 *last_created_index); + bool update, + u64 *last_created_index, + u64 *last_updated_index); /* Process all subobject columns */ static struct command_result *process_json_subobjs(struct command *cmd, @@ -634,7 +634,8 @@ static struct command_result *process_json_subobjs(struct command *cmd, const jsmntok_t *t, const struct table_desc *td, u64 this_rowid, - u64 *last_created_index) + u64 *last_created_index, + u64 *last_updated_index) { for (size_t i = 0; i < tal_count(td->columns); i++) { const struct column *col = td->columns[i]; @@ -651,10 +652,11 @@ static struct command_result *process_json_subobjs(struct command *cmd, /* If it's an array, use process_json_list */ if (!col->sub->is_subobject) { ret = process_json_list(cmd, buf, coltok, &this_rowid, - col->sub, last_created_index); + col->sub, false, + last_created_index, last_updated_index); } else { ret = process_json_subobjs(cmd, buf, coltok, col->sub, - this_rowid, last_created_index); + this_rowid, last_created_index, last_updated_index); } if (ret) return ret; @@ -672,7 +674,8 @@ static struct command_result *process_json_obj(struct command *cmd, const u64 *parent_rowid, size_t *sqloff, sqlite3_stmt *stmt, - u64 *last_created_index) + u64 *last_created_index, + u64 *last_updated_index) { struct sql *sql = sql_of(cmd->plugin); int err; @@ -700,7 +703,7 @@ static struct command_result *process_json_obj(struct command *cmd, else coltok = json_get_member(buf, t, col->jsonname); ret = process_json_obj(cmd, buf, coltok, col->sub, row, this_rowid, - NULL, sqloff, stmt, last_created_index); + NULL, sqloff, stmt, last_created_index, last_updated_index); if (ret) return ret; continue; @@ -750,6 +753,11 @@ static struct command_result *process_json_obj(struct command *cmd, && val64 > *last_created_index) { *last_created_index = val64; } + /* updated_index -> last_updated_index */ + if (streq(col->dbname, "updated_index") + && val64 > *last_updated_index) { + *last_updated_index = val64; + } break; case FIELD_BOOL: if (!json_to_bool(buf, coltok, &valbool)) { @@ -816,12 +824,12 @@ static struct command_result *process_json_obj(struct command *cmd, if (err != SQLITE_DONE) { return command_fail(cmd, LIGHTNINGD, "Error executing %s on row %zu: %s", - td->update_stmt, + td->insert_stmt, row, sqlite3_errmsg(sql->db)); } - return process_json_subobjs(cmd, buf, t, td, this_rowid, last_created_index); + return process_json_subobjs(cmd, buf, t, td, this_rowid, last_created_index, last_updated_index); } /* A list, such as in the top-level reply, or for a sub-table */ @@ -830,22 +838,36 @@ static struct command_result *process_json_list(struct command *cmd, const jsmntok_t *arr, const u64 *parent_rowid, const struct table_desc *td, - u64 *last_created_index) + bool update, + u64 *last_created_index, + u64 *last_updated_index) { struct sql *sql = sql_of(cmd->plugin); size_t i; const jsmntok_t *t; int err; - sqlite3_stmt *stmt; + sqlite3_stmt *insert_stmt, *delete_stmt; struct command_result *ret = NULL; - err = sqlite3_prepare_v2(sql->db, td->update_stmt, -1, &stmt, NULL); + err = sqlite3_prepare_v2(sql->db, td->insert_stmt, -1, &insert_stmt, NULL); if (err != SQLITE_OK) { return command_fail(cmd, LIGHTNINGD, "preparing '%s' failed: %s", - td->update_stmt, + td->insert_stmt, sqlite3_errmsg(sql->db)); } + /* Updating? Delete any previous record */ + if (update) { + err = sqlite3_prepare_v2(sql->db, td->delete_stmt, -1, &delete_stmt, NULL); + if (err != SQLITE_OK) { + return command_fail(cmd, LIGHTNINGD, "preparing '%s' failed: %s", + td->delete_stmt, + sqlite3_errmsg(sql->db)); + } + } else { + delete_stmt = NULL; + } + json_for_each_arr(i, t, arr) { /* sqlite3 columns are 1-based! */ size_t off = 1; @@ -854,7 +876,8 @@ static struct command_result *process_json_list(struct command *cmd, if (!td->has_created_index) { this_rowid = sql->next_rowid++; /* First entry is always the rowid */ - sqlite3_bind_int64(stmt, off++, this_rowid); + sqlite3_bind_int64(insert_stmt, off++, this_rowid); + assert(!delete_stmt); } else { if (!json_to_u64(buf, json_get_member(buf, t, "created_index"), @@ -863,35 +886,63 @@ static struct command_result *process_json_list(struct command *cmd, td->cmdname, json_tok_full_len(t), json_tok_full(buf, t)); + + /* For updates, we simply delete old entry: this + * cascades to subtables. We ignore updates on + * entries we don't have yet, too. */ + if (delete_stmt) { + if (this_rowid > td->last_created_index) + continue; + sqlite3_bind_int64(delete_stmt, 1, this_rowid); + err = sqlite3_step(delete_stmt); + if (err != SQLITE_DONE) { + return command_fail(cmd, LIGHTNINGD, + "Error executing %s on id %"PRIu64": %s", + td->delete_stmt, this_rowid, + sqlite3_errmsg(sql->db)); + } + sqlite3_reset(delete_stmt); + } } - ret = process_json_obj(cmd, buf, t, td, i, this_rowid, parent_rowid, &off, stmt, last_created_index); + ret = process_json_obj(cmd, buf, t, td, i, this_rowid, parent_rowid, &off, insert_stmt, last_created_index, last_updated_index); if (ret) break; - sqlite3_reset(stmt); + sqlite3_reset(insert_stmt); } - sqlite3_finalize(stmt); + sqlite3_finalize(insert_stmt); + if (delete_stmt) + sqlite3_finalize(delete_stmt); + return ret; } -/* Process top-level JSON result object */ +/* Process top-level JSON result object. + * If update is true, ignore entries > td->last_created_index, and + * delete before insert. + * Put the maximum processed created_index in *last_created_index. + * Put the number of entries in *num_entries, if not NULL; + */ static struct command_result *process_json_result(struct command *cmd, const char *buf, const jsmntok_t *result, const struct table_desc *td, + bool update, u64 *last_created_index, + u64 *last_updated_index, size_t *num_entries) { const jsmntok_t *arr; struct timerel so_far = timemono_since(td->refresh_start); plugin_log(cmd->plugin, LOG_DBG, - "Time to call %s: %"PRIu64".%09"PRIu64" seconds", - td->cmdname, + "Time to call %s%s: %"PRIu64".%09"PRIu64" seconds", + td->cmdname, update ? " (updates)" : "", (u64)so_far.ts.tv_sec, (u64)so_far.ts.tv_nsec); arr = json_get_member(buf, result, td->arrname); if (num_entries) *num_entries = arr->size; - return process_json_list(cmd, buf, arr, NULL, td, last_created_index); + return process_json_list(cmd, buf, arr, NULL, td, update, + last_created_index, last_updated_index); } static struct command_result *default_list_done(struct command *cmd, @@ -901,7 +952,7 @@ static struct command_result *default_list_done(struct command *cmd, struct db_query *dbq) { struct sql *sql = sql_of(cmd->plugin); - const struct table_desc *td = dbq->tables[0]; + struct table_desc *td = dbq->tables[0]; struct command_result *ret; int err; char *errmsg; @@ -914,7 +965,8 @@ static struct command_result *default_list_done(struct command *cmd, td->name, errmsg); } - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, false, + &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -922,7 +974,7 @@ static struct command_result *default_list_done(struct command *cmd, } static struct command_result *default_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq) { struct out_req *req; @@ -985,7 +1037,7 @@ static void delete_channel_from_db(struct command *cmd, } static struct command_result *channels_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); static struct command_result *listchannels_one_done(struct command *cmd, @@ -994,10 +1046,11 @@ static struct command_result *listchannels_one_done(struct command *cmd, const jsmntok_t *result, struct db_query *dbq) { - const struct table_desc *td = dbq->tables[0]; + struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, false, + &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -1006,8 +1059,8 @@ static struct command_result *listchannels_one_done(struct command *cmd, } static struct command_result *channels_refresh(struct command *cmd, - const struct table_desc *td, - struct db_query *dbq) + struct table_desc *td, + struct db_query *dbq) { struct sql *sql = sql_of(cmd->plugin); struct out_req *req; @@ -1086,7 +1139,7 @@ static struct command_result *channels_refresh(struct command *cmd, } static struct command_result *nodes_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); static struct command_result *listnodes_one_done(struct command *cmd, @@ -1095,10 +1148,11 @@ static struct command_result *listnodes_one_done(struct command *cmd, const jsmntok_t *result, struct db_query *dbq) { - const struct table_desc *td = dbq->tables[0]; + struct table_desc *td = dbq->tables[0]; struct command_result *ret; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, NULL); + ret = process_json_result(cmd, buf, result, td, false, + &td->last_created_index, &td->last_updated_index, NULL); if (ret) return ret; @@ -1154,7 +1208,7 @@ static bool extract_node_id(int gosstore_fd, size_t off, u16 type, } static struct command_result *nodes_refresh(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq) { struct sql *sql = sql_of(cmd->plugin); @@ -1217,6 +1271,102 @@ static struct command_result *nodes_refresh(struct command *cmd, return one_refresh_done(cmd, dbq, false); } +/* Mutual recursion */ +static void watch_for(struct sql *sql, + struct table_desc *td, + const char *indexname, + u64 next_index); + +static struct command_result *wait_done(struct command *auxcmd, + const char *method, + const char *buf, + const jsmntok_t *result, + struct table_desc *td) +{ + const jsmntok_t *valtok; + const char *indexname; + u64 val; + + if ((valtok = json_get_member(buf, result, "created")) != NULL) { + indexname = "created"; + td->refresh_needs |= REFRESH_CREATED; + } else if ((valtok = json_get_member(buf, result, "updated")) != NULL) { + indexname = "updated"; + td->refresh_needs |= REFRESH_UPDATED; + } else if ((valtok = json_get_member(buf, result, "deleted")) != NULL) { + indexname = "deleted"; + td->refresh_needs |= REFRESH_DELETED; + } else { + plugin_err(auxcmd->plugin, + "Invalid wait_done for %s: '%.*s'", + td->name, + json_tok_full_len(result), + json_tok_full(buf, result)); + } + + if (!json_to_u64(buf, valtok, &val)) { + plugin_err(auxcmd->plugin, + "Invalid wait_done index for %s: '%.*s'", + td->name, + json_tok_full_len(result), + json_tok_full(buf, result)); + } + + /* Keep watching for next one */ + watch_for(sql_of(auxcmd->plugin), td, indexname, val + 1); + return command_still_pending(auxcmd); +} + +static void watch_for(struct sql *sql, + struct table_desc *td, + const char *indexname, + u64 next_index) +{ + struct out_req *req; + + req = jsonrpc_request_start(sql->waitcmd, "wait", wait_done, + plugin_broken_cb, td); + json_add_string(req->js, "subsystem", td->waitname); + json_add_string(req->js, "indexname", indexname); + json_add_u64(req->js, "nextvalue", next_index); + send_outreq(req); +} + +/* First time we initialize counters and figure where we're up to */ +static void watch_init(struct command *cmd, + struct table_desc *td, + const char *indexname, + u64 *max) +{ + struct json_out *params = json_out_new(NULL); + const jsmntok_t *result, *valtok; + const char *buf; + u64 val; + + json_out_start(params, NULL, '{'); + json_out_addstr(params, "subsystem", td->waitname); + json_out_addstr(params, "indexname", indexname); + json_out_add(params, "nextvalue", false, "0"); + json_out_end(params, '}'); + + result = jsonrpc_request_sync(tmpctx, cmd, "wait", take(params), &buf); + + valtok = json_get_member(buf, result, indexname); + if (!valtok || !json_to_u64(buf, valtok, &val)) { + plugin_err(cmd->plugin, + "Invalid wait reply for %s %s: '%.*s'", + td->name, indexname, + json_tok_full_len(result), + json_tok_full(buf, result)); + } + + if (max != NULL) + *max = val; + + /* Place watch for when it increases */ + watch_for(sql_of(cmd->plugin), td, indexname, val + 1); +} + static struct command_result *refresh_tables(struct command *cmd, struct db_query *dbq) { @@ -1225,8 +1375,6 @@ static struct command_result *refresh_tables(struct command *cmd, if (tal_count(dbq->tables) == 0) return refresh_complete(cmd, dbq); - /* td is const, but last_created_index needs updating, so we hand - * pointer in dbq. */ td = dbq->tables[0]; /* If it's currently being refreshed, wait */ @@ -1238,12 +1386,20 @@ static struct command_result *refresh_tables(struct command *cmd, return command_still_pending(cmd); } - if (!td->needs_refresh) + if (td->refresh_needs == REFRESH_UNNECESSARY) return next_refresh(cmd, dbq); - dbq->last_created_index = &dbq->tables[0]->last_created_index; td->refreshing = true; td->refresh_start = time_mono(); + + /* The first time, we may need to install watches */ + if (!td->populated && td->waitname) { + /* We will initialize td->last_created_index as we read them in */ + watch_init(cmd, td, "created", NULL); + watch_init(cmd, td, "updated", &td->last_updated_index); + watch_init(cmd, td, "deleted", NULL); + } + return td->refresh(cmd, dbq->tables[0], dbq); } @@ -1410,7 +1566,7 @@ static struct command_result *json_listsqlschemas(struct command *cmd, } /* Adds a sub_object to this sql statement (and sub-sub etc) */ -static void add_sub_object(char **update_stmt, char **create_stmt, +static void add_sub_object(char **insert_stmt, char **create_stmt, const char **sep, struct table_desc *sub) { /* sub-arrays are a completely separate table. */ @@ -1422,11 +1578,11 @@ static void add_sub_object(char **update_stmt, char **create_stmt, const struct column *subcol = sub->columns[j]; if (subcol->sub) { - add_sub_object(update_stmt, create_stmt, sep, + add_sub_object(insert_stmt, create_stmt, sep, subcol->sub); continue; } - tal_append_fmt(update_stmt, "%s?", *sep); + tal_append_fmt(insert_stmt, "%s?", *sep); tal_append_fmt(create_stmt, "%s%s %s", *sep, subcol->dbname, @@ -1450,7 +1606,7 @@ static const char *primary_key_name(const struct table_desc *td) static void finish_td(struct plugin *plugin, struct table_desc *td) { struct sql *sql = sql_of(plugin); - char *create_stmt; + char *create_stmt, *insert_stmt; int err; char *errmsg; const char *sep = ""; @@ -1461,11 +1617,14 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) goto do_subtables; create_stmt = tal_fmt(tmpctx, "CREATE TABLE %s (", td->name); - td->update_stmt = tal_fmt(td, "INSERT INTO %s VALUES (", td->name); + insert_stmt = tal_fmt(td, "INSERT INTO %s VALUES (", td->name); /* If no created_index, create explicit rowid */ if (!td->has_created_index) { tal_append_fmt(&create_stmt, "rowid INTEGER PRIMARY KEY, "); - tal_append_fmt(&td->update_stmt, "?, "); + tal_append_fmt(&insert_stmt, "?, "); + td->delete_stmt = NULL; + } else { + td->delete_stmt = tal_fmt(td, "DELETE FROM %s WHERE created_index = ?;", td->name); } /* If we're a child array, we reference the parent column */ @@ -1478,7 +1637,7 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) "row INTEGER REFERENCES %s(%s) ON DELETE CASCADE," " arrindex INTEGER", parent->name, primary_key_name(parent)); - tal_append_fmt(&td->update_stmt, "?,?"); + tal_append_fmt(&insert_stmt, "?,?"); sep = ","; } @@ -1486,11 +1645,11 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) const struct column *col = td->columns[i]; if (col->sub) { - add_sub_object(&td->update_stmt, &create_stmt, + add_sub_object(&insert_stmt, &create_stmt, &sep, col->sub); continue; } - tal_append_fmt(&td->update_stmt, "%s?", sep); + tal_append_fmt(&insert_stmt, "%s?", sep); tal_append_fmt(&create_stmt, "%s%s %s", sep, col->dbname, @@ -1501,7 +1660,8 @@ static void finish_td(struct plugin *plugin, struct table_desc *td) sep = ","; } tal_append_fmt(&create_stmt, ");"); - tal_append_fmt(&td->update_stmt, ");"); + tal_append_fmt(&insert_stmt, ");"); + td->insert_stmt = insert_stmt; err = sqlite3_exec(sql->db, create_stmt, NULL, NULL, &errmsg); if (err != SQLITE_OK) @@ -1567,7 +1727,9 @@ static struct command_result *limited_list_done(struct command *cmd, struct command_result *ret; size_t num_entries; - ret = process_json_result(cmd, buf, result, td, dbq->last_created_index, + ret = process_json_result(cmd, buf, result, td, false, + &td->last_created_index, + &td->last_updated_index, &num_entries); if (ret) return ret; @@ -1578,23 +1740,85 @@ static struct command_result *limited_list_done(struct command *cmd, /* The simplest case: append-only lists */ static struct command_result *refresh_by_created_index(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq) { struct out_req *req; + + /* Since we're relying on watches, mark refreshing unnecessary to start */ + assert(td->refresh_needs != REFRESH_UNNECESSARY); + td->refresh_needs = REFRESH_UNNECESSARY; + req = jsonrpc_request_start(cmd, td->cmdname, limited_list_done, forward_error, dbq); json_add_string(req->js, "index", "created"); - json_add_u64(req->js, "start", *dbq->last_created_index + 1); + json_add_u64(req->js, "start", td->last_created_index + 1); json_add_u64(req->js, "limit", LIMIT_PER_LIST); return send_outreq(req); } +static struct command_result *updated_list_done(struct command *cmd, + const char *method, + const char *buf, + const jsmntok_t *result, + struct db_query *dbq) +{ + struct table_desc *td = dbq->tables[0]; + struct command_result *ret; + u64 unused = 0; + + /* We don't care what max created_index it processes. */ + ret = process_json_result(cmd, buf, result, td, true, &unused, &td->last_updated_index, + NULL); + if (ret) + return ret; + + /* Now we can process any new ones */ + if (td->refresh_needs & REFRESH_CREATED) { + plugin_log(cmd->plugin, LOG_DBG, + "%s: records created, inserting from %"PRIu64, td->name, td->last_created_index + 1); + return refresh_by_created_index(cmd, td, dbq); + } + + td->refresh_needs = REFRESH_UNNECESSARY; + return one_refresh_done(cmd, dbq, false); +} + +static struct command_result *paginated_refresh(struct command *cmd, + struct table_desc *td, + struct db_query *dbq) +{ + /* In case something was deleted (rare!) we just reload the + * entire thing */ + if (td->refresh_needs & REFRESH_DELETED) { + plugin_log(cmd->plugin, LOG_DBG, "%s: total reload due to delete", td->name); + td->refresh_needs = REFRESH_UNNECESSARY; + return default_refresh(cmd, td, dbq); + } + + if (td->refresh_needs & REFRESH_UPDATED) { + struct out_req *req; + plugin_log(cmd->plugin, LOG_DBG, + "%s: records updated, updating from %"PRIu64, td->name, td->last_updated_index + 1); + req = jsonrpc_request_start(cmd, td->cmdname, + updated_list_done, forward_error, + dbq); + json_add_string(req->js, "index", "updated"); + json_add_u64(req->js, "start", td->last_updated_index + 1); + return send_outreq(req); + } + + /* No updates, no deletes: the simple case! */ + plugin_log(cmd->plugin, LOG_DBG, + "%s: records created, inserting from %"PRIu64, td->name, td->last_created_index + 1); + return refresh_by_created_index(cmd, td, dbq); +} + struct refresh_funcs { const char *cmdname; struct command_result *(*refresh)(struct command *cmd, - const struct table_desc *td, + struct table_desc *td, struct db_query *dbq); const char *waitname; }; @@ -1603,11 +1827,12 @@ static const struct refresh_funcs refresh_funcs[] = { /* These are special, using gossmap */ { "listchannels", channels_refresh, NULL }, { "listnodes", nodes_refresh, NULL }, - /* FIXME: These support wait and full pagination, but we need to watch for deletes, too! */ - { "listhtlcs", default_refresh, NULL }, - { "listforwards", default_refresh, NULL }, - { "listinvoices", default_refresh, NULL }, - { "listsendpays", default_refresh, NULL }, + /* These support wait and full pagination. */ + { "listhtlcs", paginated_refresh, "htlcs" }, + { "listforwards", paginated_refresh, "forwards" }, + { "listinvoices", paginated_refresh, "invoices" }, + { "listsendpays", paginated_refresh, "sendpays" }, + { "listnetworkevents", paginated_refresh, "networkevents" }, /* These are never changed or deleted */ { "listchainmoves", refresh_by_created_index, "chainmoves" }, { "listchannelmoves", refresh_by_created_index, "channelmoves" }, @@ -1619,7 +1844,6 @@ static const struct refresh_funcs refresh_funcs[] = { { "listtransactions", default_refresh, NULL }, { "bkpr-listaccountevents", default_refresh, NULL }, { "bkpr-listincome", default_refresh, NULL }, - { "listnetworkevents", default_refresh, NULL }, }; static const struct refresh_funcs *find_command_refresh(const char *cmdname) @@ -1654,10 +1878,11 @@ static struct table_desc *new_table_desc(const tal_t *ctx, td->arrname = json_strdup(td, schemas, arrname); td->columns = tal_arr(td, struct column *, 0); td->last_created_index = 0; + td->last_updated_index = 0; td->has_created_index = false; - td->needs_refresh = true; + td->refresh_needs = REFRESH_ALWAYS; td->refreshing = false; - td->indices_created = false; + td->populated = false; list_head_init(&td->refresh_waiters); /* Only top-levels have refresh functions */ @@ -1855,6 +2080,7 @@ static const char *init(struct command *init_cmd, struct sql *sql = sql_of(plugin); sql->db = sqlite_setup(plugin); init_tablemap(plugin, &sql->tablemap); + sql->waitcmd = aux_command(init_cmd); plugin_set_memleak_handler(plugin, memleak_mark_tablemap); return NULL; diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 9644d4161b62..5543cd2f27ec 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -4463,7 +4463,7 @@ def test_plugin_startdir_lol(node_factory): def test_autoclean_batch(node_factory): - l1 = node_factory.get_node(1) + l1 = node_factory.get_node() # Many expired invoices for i in range(100): @@ -4527,6 +4527,42 @@ def test_sql_parallel(node_factory, executor): f.result(TIMEOUT) +def test_sql_during_change(node_factory): + l1 = node_factory.get_node() + + labels = [f"test_sql_during_delete{i:02}" for i in range(10)] + for l in labels: + l1.rpc.invoice(100, l, l) + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + + # It should wait on correct values + l1.daemon.wait_for_logs(['lightningd: waiting on invoices created 11$', + 'lightningd: waiting on invoices updated 1$', + 'lightningd: waiting on invoices deleted 1$']) + + # Should notice extra one (note shorter expiry) + l = f"test_sql_during_delete{11}" + l1.rpc.invoice(100, l, l, expiry=10) + labels.append(l) + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + + l1.daemon.wait_for_log('invoices: records created, inserting from 11') + assert not l1.daemon.is_in_log('invoices: records updated') + assert not l1.daemon.is_in_log('invoices: total reload due to delete') + + # Should notice delete. + l1.rpc.delinvoice(labels[0], 'unpaid') + del labels[0] + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels]} + + l1.daemon.wait_for_log('invoices: total reload due to delete') + + # Should notice change once invoice has expired. + wait_for(lambda: only_one(l1.rpc.listinvoices(label=labels[-1])['invoices'])['status'] != 'unpaid', timeout=10 + TIMEOUT) + assert l1.rpc.sql("SELECT amount_msat, description, status FROM invoices ORDER BY description") == {'rows': [[100, l, 'unpaid'] for l in labels[:-1]] + [[100, labels[-1], 'expired']]} + l1.daemon.wait_for_log('invoices: records updated, updating from 1') + + def test_listchannels_broken_message(node_factory): """This gave a bogus BROKEN message with deprecated-apis enabled""" l1 = node_factory.get_node(options={'allow-deprecated-apis': True})