提交 b91b3e72 编写于 作者: M Mark Haines 提交者: GitHub

Use a shared PostgreSQL sequence to generate ids. (#237)

* Use a shared PostgreSQL sequence to generate ids.

Share an auto incrementing sequnce between the account data and
the room event table.
This means that account data updates can be received independantly of
room events updates.

This should give some basic support for fixing #212

* Remove redundant 'primary key'

* Re-number the SQL arguments

* Fewer lies in comments
上级 fbc4477b
......@@ -18,14 +18,20 @@ import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/syncapi/types"
)
const accountDataSchema = `
-- Stores the users account data
-- This sequence is shared between all the tables generated from kafka logs.
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores the types of account data that a user set has globally and in each room
-- and the stream ID when that type was last updated.
CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
-- The highest numeric ID from the output_room_events at the time of saving the data
id BIGINT,
-- An incrementing ID which denotes the position in the log that this event resides at.
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- ID of the user the data belongs to
user_id TEXT NOT NULL,
-- ID of the room the data is related to (empty string if not related to a specific room)
......@@ -33,8 +39,6 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type (
-- Type of the data
type TEXT NOT NULL,
PRIMARY KEY(user_id, room_id, type),
-- We don't want two entries of the same type for the same user
CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type)
);
......@@ -43,18 +47,23 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account
`
const insertAccountDataSQL = "" +
"INSERT INTO syncapi_account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
"INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" +
" ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" +
" DO UPDATE SET id = EXCLUDED.id"
" DO UPDATE SET id = EXCLUDED.id" +
" RETURNING id"
const selectAccountDataInRangeSQL = "" +
"SELECT room_id, type FROM syncapi_account_data_type" +
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC"
const selectMaxAccountDataIDSQL = "" +
"SELECT MAX(id) FROM syncapi_account_data_type"
type accountDataStatements struct {
insertAccountDataStmt *sql.Stmt
selectAccountDataInRangeStmt *sql.Stmt
selectMaxAccountDataIDStmt *sql.Stmt
}
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
......@@ -68,15 +77,17 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
return
}
if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil {
return
}
return
}
func (s *accountDataStatements) insertAccountData(
ctx context.Context,
pos types.StreamPosition,
userID, roomID, dataType string,
) (err error) {
_, err = s.insertAccountDataStmt.ExecContext(ctx, pos, userID, roomID, dataType)
) (pos int64, err error) {
s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos)
return
}
......@@ -116,3 +127,15 @@ func (s *accountDataStatements) selectAccountDataInRange(
return
}
func (s *accountDataStatements) selectMaxAccountDataID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
var nullableID sql.NullInt64
stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt)
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid {
id = nullableID.Int64
}
return
}
......@@ -26,12 +26,15 @@ import (
)
const outputRoomEventsSchema = `
-- This sequence is shared between all the tables generated from kafka logs.
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- An incrementing ID which denotes the position in the log that this event resides at.
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
-- This isn't a problem for us since we just want to order by this field.
id BIGSERIAL PRIMARY KEY,
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event
event_id TEXT NOT NULL,
-- The 'room_id' key for the event.
......@@ -60,7 +63,7 @@ const selectRecentEventsSQL = "" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
const selectMaxIDSQL = "" +
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
......@@ -73,7 +76,7 @@ const selectStateInRangeSQL = "" +
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxIDStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
}
......@@ -89,7 +92,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
return
}
if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil {
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
return
}
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
......@@ -170,11 +173,11 @@ func (s *outputRoomEventsStatements) selectStateInRange(
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
// then this function should only ever be used at startup, as it will race with inserting events if it is
// done afterwards. If there are no inserted events, 0 is returned.
func (s *outputRoomEventsStatements) selectMaxID(
func (s *outputRoomEventsStatements) selectMaxEventID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
var nullableID sql.NullInt64
stmt := common.TxStmt(txn, s.selectMaxIDStmt)
stmt := common.TxStmt(txn, s.selectMaxEventIDStmt)
err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid {
id = nullableID.Int64
......
......@@ -174,11 +174,24 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
id, err := d.events.selectMaxID(ctx, nil)
return d.syncStreamPositionTx(ctx, nil)
}
func (d *SyncServerDatabase) syncStreamPositionTx(
ctx context.Context, txn *sql.Tx,
) (types.StreamPosition, error) {
maxID, err := d.events.selectMaxEventID(ctx, txn)
if err != nil {
return 0, err
}
maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
if err != nil {
return types.StreamPosition(0), err
return 0, err
}
if maxAccountDataID > maxID {
maxID = maxAccountDataID
}
return types.StreamPosition(id), nil
return types.StreamPosition(maxID), nil
}
// IncrementalSync returns all the data needed in order to create an incremental sync response.
......@@ -271,11 +284,10 @@ func (d *SyncServerDatabase) CompleteSync(
defer common.EndTransaction(txn, &succeeded)
// Get the current stream position which we will base the sync response on.
id, err := d.events.selectMaxID(ctx, txn)
pos, err := d.syncStreamPositionTx(ctx, txn)
if err != nil {
return nil, err
}
pos := types.StreamPosition(id)
// Extract room state and recent events for all rooms the user is joined to.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
......@@ -348,13 +360,8 @@ func (d *SyncServerDatabase) GetAccountDataInRange(
func (d *SyncServerDatabase) UpsertAccountData(
ctx context.Context, userID, roomID, dataType string,
) (types.StreamPosition, error) {
pos, err := d.SyncStreamPosition(ctx)
if err != nil {
return pos, err
}
err = d.accountData.insertAccountData(ctx, pos, userID, roomID, dataType)
return pos, err
pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType)
return types.StreamPosition(pos), err
}
func (d *SyncServerDatabase) addInvitesToResponse(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册