Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
FinClip
ligase
提交
a4230089
ligase
项目概览
FinClip
/
ligase
通知
33
Star
8
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
ligase
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
a4230089
编写于
3月 29, 2017
作者:
K
Kegsay
提交者:
GitHub
3月 29, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Read roomserver output log and remember position across restarts (#52)
上级
7ab04366
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
104 addition
and
3 deletion
+104
-3
src/github.com/matrix-org/dendrite/clientapi/config/config.go
...github.com/matrix-org/dendrite/clientapi/config/config.go
+2
-0
src/github.com/matrix-org/dendrite/clientapi/routing/routing.go
...thub.com/matrix-org/dendrite/clientapi/routing/routing.go
+2
-2
src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go
...b.com/matrix-org/dendrite/clientapi/storage/syncserver.go
+39
-0
src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go
...thub.com/matrix-org/dendrite/clientapi/sync/syncserver.go
+43
-0
src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go
....com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go
+17
-1
src/github.com/matrix-org/dendrite/common/partition_offset_table.go
....com/matrix-org/dendrite/common/partition_offset_table.go
+1
-0
未找到文件。
src/github.com/matrix-org/dendrite/clientapi/config/config.go
浏览文件 @
a4230089
...
...
@@ -25,4 +25,6 @@ type Sync struct {
RoomserverOutputTopic
string
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server.
KafkaConsumerURIs
[]
string
// The postgres connection config for connecting to the database e.g a postgres:// URI
DataSource
string
}
src/github.com/matrix-org/dendrite/clientapi/routing/routing.go
浏览文件 @
a4230089
...
...
@@ -48,8 +48,8 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI
servMux
.
Handle
(
"/api/"
,
http
.
StripPrefix
(
"/api"
,
apiMux
))
}
// SetupSyncServer configures the given mux with sync-server listeners
func
SetupSyncServer
(
servMux
*
http
.
ServeMux
,
httpClient
*
http
.
Client
,
cfg
config
.
Sync
)
{
// SetupSyncServer
Listeners
configures the given mux with sync-server listeners
func
SetupSyncServer
Listeners
(
servMux
*
http
.
ServeMux
,
httpClient
*
http
.
Client
,
cfg
config
.
Sync
)
{
apiMux
:=
mux
.
NewRouter
()
r0mux
:=
apiMux
.
PathPrefix
(
pathPrefixR0
)
.
Subrouter
()
r0mux
.
Handle
(
"/sync"
,
make
(
"sync"
,
util
.
NewJSONRequestHandler
(
func
(
req
*
http
.
Request
)
util
.
JSONResponse
{
...
...
src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go
0 → 100644
浏览文件 @
a4230089
package
storage
import
(
"database/sql"
"github.com/matrix-org/dendrite/common"
// Import the postgres database driver.
_
"github.com/lib/pq"
)
// SyncServerDatabase represents a sync server database
type
SyncServerDatabase
struct
{
db
*
sql
.
DB
partitions
common
.
PartitionOffsetStatements
}
// NewSyncServerDatabase creates a new sync server database
func
NewSyncServerDatabase
(
dataSourceName
string
)
(
*
SyncServerDatabase
,
error
)
{
var
db
*
sql
.
DB
var
err
error
if
db
,
err
=
sql
.
Open
(
"postgres"
,
dataSourceName
);
err
!=
nil
{
return
nil
,
err
}
partitions
:=
common
.
PartitionOffsetStatements
{}
if
err
=
partitions
.
Prepare
(
db
);
err
!=
nil
{
return
nil
,
err
}
return
&
SyncServerDatabase
{
db
,
partitions
},
nil
}
// PartitionOffsets implements common.PartitionStorer
func
(
d
*
SyncServerDatabase
)
PartitionOffsets
(
topic
string
)
([]
common
.
PartitionOffset
,
error
)
{
return
d
.
partitions
.
SelectPartitionOffsets
(
topic
)
}
// SetPartitionOffset implements common.PartitionStorer
func
(
d
*
SyncServerDatabase
)
SetPartitionOffset
(
topic
string
,
partition
int32
,
offset
int64
)
error
{
return
d
.
partitions
.
UpsertPartitionOffset
(
topic
,
partition
,
offset
)
}
src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go
0 → 100644
浏览文件 @
a4230089
package
sync
import
(
log
"github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/common"
sarama
"gopkg.in/Shopify/sarama.v1"
)
// Server contains all the logic for running a sync server
type
Server
struct
{
roomServerConsumer
*
common
.
ContinualConsumer
}
// NewServer creates a new sync server. Call Start() to begin consuming from room servers.
func
NewServer
(
cfg
*
config
.
Sync
,
store
common
.
PartitionStorer
)
(
*
Server
,
error
)
{
kafkaConsumer
,
err
:=
sarama
.
NewConsumer
(
cfg
.
KafkaConsumerURIs
,
nil
)
if
err
!=
nil
{
return
nil
,
err
}
consumer
:=
common
.
ContinualConsumer
{
Topic
:
cfg
.
RoomserverOutputTopic
,
Consumer
:
kafkaConsumer
,
PartitionStore
:
store
,
}
s
:=
&
Server
{
roomServerConsumer
:
&
consumer
,
}
consumer
.
ProcessMessage
=
s
.
onMessage
return
s
,
nil
}
// Start consuming from room servers
func
(
s
*
Server
)
Start
()
error
{
return
s
.
roomServerConsumer
.
Start
()
}
func
(
s
*
Server
)
onMessage
(
msg
*
sarama
.
ConsumerMessage
)
error
{
log
.
WithField
(
"key"
,
string
(
msg
.
Key
))
.
WithField
(
"val"
,
string
(
msg
.
Value
))
.
Info
(
"Recv"
)
return
nil
}
src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go
浏览文件 @
a4230089
...
...
@@ -7,6 +7,8 @@ import (
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/clientapi/storage"
"github.com/matrix-org/dendrite/clientapi/sync"
log
"github.com/Sirupsen/logrus"
"github.com/matrix-org/dugong"
...
...
@@ -40,10 +42,24 @@ func main() {
cfg
:=
config
.
Sync
{
KafkaConsumerURIs
:
[]
string
{
"localhost:9092"
},
RoomserverOutputTopic
:
"roomserverOutput"
,
DataSource
:
"postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable"
,
}
log
.
Info
(
"Starting sync server"
)
routing
.
SetupSyncServer
(
http
.
DefaultServeMux
,
http
.
DefaultClient
,
cfg
)
db
,
err
:=
storage
.
NewSyncServerDatabase
(
cfg
.
DataSource
)
if
err
!=
nil
{
log
.
Panicf
(
"startup: failed to create sync server database with data source %s : %s"
,
cfg
.
DataSource
,
err
)
}
server
,
err
:=
sync
.
NewServer
(
&
cfg
,
db
)
if
err
!=
nil
{
log
.
Panicf
(
"startup: failed to create sync server: %s"
,
err
)
}
if
err
=
server
.
Start
();
err
!=
nil
{
log
.
Panicf
(
"startup: failed to start sync server"
)
}
routing
.
SetupSyncServerListeners
(
http
.
DefaultServeMux
,
http
.
DefaultClient
,
cfg
)
log
.
Fatal
(
http
.
ListenAndServe
(
bindAddr
,
nil
))
}
src/github.com/matrix-org/dendrite/common/partition_offset_table.go
浏览文件 @
a4230089
...
...
@@ -57,6 +57,7 @@ func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]Part
if
err
:=
rows
.
Scan
(
&
offset
.
Partition
,
&
offset
.
Offset
);
err
!=
nil
{
return
nil
,
err
}
results
=
append
(
results
,
offset
)
}
return
results
,
nil
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录