output_room_events_table.go 8.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17 18 19
package storage

import (
	"database/sql"

K
Kegsay 已提交
20
	log "github.com/Sirupsen/logrus"
21
	"github.com/lib/pq"
R
Robert Swain 已提交
22
	"github.com/matrix-org/dendrite/syncapi/types"
K
Kegsay 已提交
23
	"github.com/matrix-org/gomatrixserverlib"
24 25 26 27
)

const outputRoomEventsSchema = `
-- Stores output room events received from the roomserver.
28
CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
29 30 31 32
    -- An incrementing ID which denotes the position in the log that this event resides at.
    -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
    --     This isn't a problem for us since we just want to order by this field.
    id BIGSERIAL PRIMARY KEY,
K
Kegsay 已提交
33 34
    -- The event ID for the event
    event_id TEXT NOT NULL,
35 36 37 38
    -- The 'room_id' key for the event.
    room_id TEXT NOT NULL,
    -- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
    event_json TEXT NOT NULL,
K
Kegsay 已提交
39 40 41 42
    -- A list of event IDs which represent a delta of added/removed room state. This can be NULL
    -- if there is no delta.
    add_state_ids TEXT[],
    remove_state_ids TEXT[]
43
);
K
Kegsay 已提交
44
-- for event selection
45
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
46 47 48
`

const insertEventSQL = "" +
49 50 51
	"INSERT INTO syncapi_output_room_events (" +
	" room_id, event_id, event_json, add_state_ids, remove_state_ids" +
	") VALUES ($1, $2, $3, $4, $5) RETURNING id"
K
Kegsay 已提交
52 53

const selectEventsSQL = "" +
54
	"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
55

56
const selectRecentEventsSQL = "" +
57 58 59
	"SELECT id, event_json FROM syncapi_output_room_events" +
	" WHERE room_id = $1 AND id > $2 AND id <= $3" +
	" ORDER BY id DESC LIMIT $4"
60

61
const selectMaxIDSQL = "" +
62
	"SELECT MAX(id) FROM syncapi_output_room_events"
63

K
Kegsay 已提交
64 65
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
66 67
	"SELECT id, event_json, add_state_ids, remove_state_ids" +
	" FROM syncapi_output_room_events" +
68
	" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
K
Kegsay 已提交
69 70
	" ORDER BY id ASC"

71
type outputRoomEventsStatements struct {
72 73 74 75 76
	insertEventStmt        *sql.Stmt
	selectEventsStmt       *sql.Stmt
	selectMaxIDStmt        *sql.Stmt
	selectRecentEventsStmt *sql.Stmt
	selectStateInRangeStmt *sql.Stmt
77 78 79 80 81 82 83 84 85 86
}

func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
	_, err = db.Exec(outputRoomEventsSchema)
	if err != nil {
		return
	}
	if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
		return
	}
K
Kegsay 已提交
87 88 89
	if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
		return
	}
90 91 92
	if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil {
		return
	}
93 94 95
	if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
		return
	}
K
Kegsay 已提交
96 97 98
	if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
		return
	}
99 100 101
	return
}

102
// selectStateInRange returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos.
K
Kegsay 已提交
103 104
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
105 106 107
func (s *outputRoomEventsStatements) selectStateInRange(
	txn *sql.Tx, oldPos, newPos types.StreamPosition,
) (map[string]map[string]bool, map[string]streamEvent, error) {
K
Kegsay 已提交
108 109
	rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos)
	if err != nil {
110
		return nil, nil, err
K
Kegsay 已提交
111 112 113 114 115 116
	}
	// Fetch all the state change events for all rooms between the two positions then loop each event and:
	//  - Keep a cache of the event by ID (99% of state change events are for the event itself)
	//  - For each room ID, build up an array of event IDs which represents cumulative adds/removes
	// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
	// if they aren't in the event ID cache. We don't handle state deletion yet.
117
	eventIDToEvent := make(map[string]streamEvent)
K
Kegsay 已提交
118 119 120 121 122 123

	// RoomID => A set (map[string]bool) of state event IDs which are between the two positions
	stateNeeded := make(map[string]map[string]bool)

	for rows.Next() {
		var (
124
			streamPos  int64
K
Kegsay 已提交
125 126 127 128
			eventBytes []byte
			addIDs     pq.StringArray
			delIDs     pq.StringArray
		)
129
		if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil {
130
			return nil, nil, err
K
Kegsay 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
		}
		// Sanity check for deleted state and whine if we see it. We don't need to do anything
		// since it'll just mark the event as not being needed.
		if len(addIDs) < len(delIDs) {
			log.WithFields(log.Fields{
				"since":   oldPos,
				"current": newPos,
				"adds":    addIDs,
				"dels":    delIDs,
			}).Warn("StateBetween: ignoring deleted state")
		}

		// TODO: Handle redacted events
		ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
		if err != nil {
146
			return nil, nil, err
K
Kegsay 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159
		}
		needSet := stateNeeded[ev.RoomID()]
		if needSet == nil { // make set if required
			needSet = make(map[string]bool)
		}
		for _, id := range delIDs {
			needSet[id] = false
		}
		for _, id := range addIDs {
			needSet[id] = true
		}
		stateNeeded[ev.RoomID()] = needSet

160
		eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)}
K
Kegsay 已提交
161 162
	}

163
	return stateNeeded, eventIDToEvent, nil
K
Kegsay 已提交
164 165
}

166 167 168
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
// then this function should only ever be used at startup, as it will race with inserting events if it is
// done afterwards. If there are no inserted events, 0 is returned.
169
func (s *outputRoomEventsStatements) selectMaxID(txn *sql.Tx) (id int64, err error) {
170 171 172 173
	stmt := s.selectMaxIDStmt
	if txn != nil {
		stmt = txn.Stmt(stmt)
	}
174
	var nullableID sql.NullInt64
175
	err = stmt.QueryRow().Scan(&nullableID)
176 177 178
	if nullableID.Valid {
		id = nullableID.Int64
	}
179 180 181
	return
}

182 183
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
// of the inserted event.
184
func (s *outputRoomEventsStatements) insertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) {
185
	err = txn.Stmt(s.insertEventStmt).QueryRow(
K
Kegsay 已提交
186
		event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState),
187 188
	).Scan(&streamPos)
	return
189
}
K
Kegsay 已提交
190

191
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
192 193 194
func (s *outputRoomEventsStatements) selectRecentEvents(
	txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int,
) ([]streamEvent, error) {
K
Kegsay 已提交
195
	rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit)
196 197 198 199
	if err != nil {
		return nil, err
	}
	defer rows.Close()
200
	events, err := rowsToStreamEvents(rows)
201 202 203 204 205 206
	if err != nil {
		return nil, err
	}
	// reverse the order because [0] is the newest event due to the ORDER BY in SQL-land. The reverse order makes [0] the oldest event,
	// which is correct for /sync responses.
	return reverseEvents(events), nil
207 208
}

K
Kegsay 已提交
209 210
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
// from the database.
211
func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
212 213 214
	stmt := s.selectEventsStmt
	if txn != nil {
		stmt = txn.Stmt(stmt)
K
Kegsay 已提交
215
	}
216
	rows, err := stmt.Query(pq.StringArray(eventIDs))
217 218 219
	if err != nil {
		return nil, err
	}
220 221
	defer rows.Close()
	return rowsToStreamEvents(rows)
222 223
}

224
func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) {
225
	var result []streamEvent
226
	for rows.Next() {
227 228 229 230 231
		var (
			streamPos  int64
			eventBytes []byte
		)
		if err := rows.Scan(&streamPos, &eventBytes); err != nil {
K
Kegsay 已提交
232 233
			return nil, err
		}
234
		// TODO: Handle redacted events
K
Kegsay 已提交
235 236 237 238
		ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
		if err != nil {
			return nil, err
		}
239
		result = append(result, streamEvent{ev, types.StreamPosition(streamPos)})
K
Kegsay 已提交
240 241 242
	}
	return result, nil
}
243

244
func reverseEvents(input []streamEvent) (output []streamEvent) {
245 246 247 248 249
	for i := len(input) - 1; i >= 0; i-- {
		output = append(output, input[i])
	}
	return
}