WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 429af1a

Browse files
wenxuwanCrazyHZM
authored andcommitted
Merge branch 'main' into feat/log_management
2 parents 6637e90 + 079b75e commit 429af1a

35 files changed

+685
-332
lines changed

cmd/layotto/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ import (
8888
// Configuration
8989
"mosn.io/layotto/components/configstores"
9090
"mosn.io/layotto/components/configstores/apollo"
91+
store_inmemory "mosn.io/layotto/components/configstores/in-memory"
9192

9293
// Pub/Sub
9394
dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub"
@@ -297,6 +298,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
297298
configstores.NewStoreFactory("apollo", apollo.NewStore),
298299
configstores.NewStoreFactory("etcd", etcdv3.NewStore),
299300
configstores.NewStoreFactory("nacos", nacos.NewStore),
301+
configstores.NewStoreFactory("in-memory", store_inmemory.NewStore),
300302
),
301303

302304
// RPC

cmd/layotto_multiple_api/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ import (
9494
// Configuration
9595
"mosn.io/layotto/components/configstores"
9696
"mosn.io/layotto/components/configstores/apollo"
97+
store_inmemory "mosn.io/layotto/components/configstores/in-memory"
9798
"mosn.io/layotto/components/configstores/nacos"
9899

99100
// Pub/Sub
@@ -303,6 +304,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
303304
configstores.NewStoreFactory("apollo", apollo.NewStore),
304305
configstores.NewStoreFactory("etcd", etcdv3.NewStore),
305306
configstores.NewStoreFactory("nacos", nacos.NewStore),
307+
configstores.NewStoreFactory("in-memory", store_inmemory.NewStore),
306308
),
307309

308310
// RPC

cmd/layotto_without_xds/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ import (
8585
// Configuration
8686
"mosn.io/layotto/components/configstores"
8787
"mosn.io/layotto/components/configstores/apollo"
88+
store_inmemory "mosn.io/layotto/components/configstores/in-memory"
8889

8990
// Pub/Sub
9091
dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub"
@@ -276,6 +277,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
276277
configstores.NewStoreFactory("apollo", apollo.NewStore),
277278
configstores.NewStoreFactory("etcd", etcdv3.NewStore),
278279
configstores.NewStoreFactory("nacos", nacos.NewStore),
280+
configstores.NewStoreFactory("in-memory", store_inmemory.NewStore),
279281
),
280282

281283
// RPC
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright 2021 Layotto Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package in_memory
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"sync"
20+
21+
"mosn.io/layotto/components/configstores"
22+
"mosn.io/layotto/components/pkg/actuators"
23+
"mosn.io/layotto/components/trace"
24+
)
25+
26+
var (
27+
once sync.Once
28+
readinessIndicator *actuators.HealthIndicator
29+
livenessIndicator *actuators.HealthIndicator
30+
)
31+
32+
const (
33+
componentName = "configstore-memory"
34+
defaultGroup = "default"
35+
defaultLabel = "default"
36+
)
37+
38+
func init() {
39+
readinessIndicator = actuators.NewHealthIndicator()
40+
livenessIndicator = actuators.NewHealthIndicator()
41+
}
42+
43+
type InMemoryConfigStore struct {
44+
data *sync.Map
45+
listener *sync.Map
46+
storeName string
47+
appId string
48+
}
49+
50+
func NewStore() configstores.Store {
51+
once.Do(func() {
52+
indicators := &actuators.ComponentsIndicator{ReadinessIndicator: readinessIndicator, LivenessIndicator: livenessIndicator}
53+
actuators.SetComponentsIndicator(componentName, indicators)
54+
})
55+
return &InMemoryConfigStore{
56+
data: &sync.Map{},
57+
listener: &sync.Map{},
58+
}
59+
}
60+
61+
func (m *InMemoryConfigStore) Init(config *configstores.StoreConfig) error {
62+
m.appId = config.AppId
63+
m.storeName = config.StoreName
64+
readinessIndicator.SetStarted()
65+
livenessIndicator.SetStarted()
66+
return nil
67+
}
68+
69+
// Get gets configuration from configuration store.
70+
func (m *InMemoryConfigStore) Get(ctx context.Context, req *configstores.GetRequest) ([]*configstores.ConfigurationItem, error) {
71+
72+
res := make([]*configstores.ConfigurationItem, 0, len(req.Keys))
73+
74+
for _, key := range req.Keys {
75+
value, ok := m.data.Load(key)
76+
if ok {
77+
config := &configstores.ConfigurationItem{
78+
Content: value.(string),
79+
Key: key,
80+
Group: req.Group,
81+
}
82+
res = append(res, config)
83+
}
84+
}
85+
trace.SetExtraComponentInfo(ctx, fmt.Sprintf("method: %+v, store: %+v", "Get", "memory"))
86+
return res, nil
87+
}
88+
89+
// Set saves configuration into configuration store.
90+
func (m *InMemoryConfigStore) Set(ctx context.Context, req *configstores.SetRequest) error {
91+
if len(req.Items) == 0 {
92+
return fmt.Errorf("params illegal:item is empty")
93+
}
94+
for _, item := range req.Items {
95+
m.data.Store(item.Key, item.Content)
96+
m.notifyChanged(item)
97+
}
98+
return nil
99+
}
100+
101+
// Delete deletes configuration from configuration store.
102+
func (m *InMemoryConfigStore) Delete(ctx context.Context, req *configstores.DeleteRequest) error {
103+
for _, key := range req.Keys {
104+
m.data.Delete(key)
105+
}
106+
return nil
107+
}
108+
109+
// Subscribe gets configuration from configuration store and subscribe the updates.
110+
func (m *InMemoryConfigStore) Subscribe(request *configstores.SubscribeReq, ch chan *configstores.SubscribeResp) error {
111+
if request.Group == "" && len(request.Keys) > 0 {
112+
request.Group = defaultGroup
113+
}
114+
115+
ctx := context.Background()
116+
req := &configstores.GetRequest{
117+
AppId: request.AppId,
118+
Group: request.Group,
119+
Label: request.Label,
120+
Keys: request.Keys,
121+
Metadata: request.Metadata,
122+
}
123+
124+
for _, key := range req.Keys {
125+
m.listener.Store(key, m.subscribeOnChange(ch))
126+
}
127+
128+
items, err := m.Get(ctx, req)
129+
if err != nil {
130+
return err
131+
}
132+
133+
for _, item := range items {
134+
m.notifyChanged(item)
135+
}
136+
137+
return nil
138+
}
139+
140+
func (m *InMemoryConfigStore) notifyChanged(item *configstores.ConfigurationItem) {
141+
f, ok := m.listener.Load(item.Key)
142+
if ok {
143+
f.(OnChangeFunc)(item.Group, item.Key, item.Content)
144+
}
145+
}
146+
147+
type OnChangeFunc func(group, dataId, data string)
148+
149+
func (m *InMemoryConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp) OnChangeFunc {
150+
return func(group, dataId, data string) {
151+
resp := &configstores.SubscribeResp{
152+
StoreName: m.storeName,
153+
AppId: m.appId,
154+
Items: []*configstores.ConfigurationItem{
155+
{
156+
Key: dataId,
157+
Content: data,
158+
Group: group,
159+
},
160+
},
161+
}
162+
163+
ch <- resp
164+
}
165+
}
166+
167+
func (m *InMemoryConfigStore) StopSubscribe() {
168+
// stop listening all subscribed configs
169+
m.listener.Range(func(key, value any) bool {
170+
m.listener.Delete(key)
171+
return true
172+
})
173+
}
174+
175+
func (m *InMemoryConfigStore) GetDefaultGroup() string {
176+
return defaultGroup
177+
}
178+
179+
func (m *InMemoryConfigStore) GetDefaultLabel() string {
180+
return defaultLabel
181+
}

0 commit comments

Comments
 (0)