發(fā)布于:2021-01-25 10:59:01
0
275
0
在一個典型的系統(tǒng)中,您可以在安全的地方(如ZooKeeper或etcd)進(jìn)出數(shù)據(jù)。然后在不一定安全的地方對數(shù)據(jù)進(jìn)行操作,例如應(yīng)用程序。盡管etcd和ZooKeeper提供了對其擁有的數(shù)據(jù)的一致性的保證,但它們不能保證可能涉及應(yīng)用程序其他部分的更廣泛的狀態(tài)轉(zhuǎn)換和事務(wù)的一致性。為此,我們需要另一種方法。
改變范式
Copycat是一個框架,它避開了依賴于外部系統(tǒng)構(gòu)建應(yīng)用程序以實(shí)現(xiàn)數(shù)據(jù)一致性的標(biāo)準(zhǔn)范例,允許您將作為狀態(tài)機(jī)編寫的應(yīng)用程序邏輯直接嵌入Copycat中,在那里,一致性和容錯性由您負(fù)責(zé)。其結(jié)果是能夠以相對簡單和簡潔的方式實(shí)現(xiàn)復(fù)雜分布式協(xié)調(diào)問題的解決方案,并封裝應(yīng)用程序的邏輯和語義,而不必?fù)?dān)心可靠性保證。
我們可以用模仿者建立什么樣的東西?這取決于你。從低級分布式原語(如鎖、組和映射)到成熟的分布式系統(tǒng)(用于調(diào)度、消息傳遞、服務(wù)發(fā)現(xiàn)或數(shù)據(jù)存儲),幾乎任何事情都是可能的。
從零到分布式數(shù)據(jù)存儲
從Copycat開始的一個好地方是用它構(gòu)建一些東西,所以讓我們創(chuàng)建一個分布式的鍵值數(shù)據(jù)存儲。不過,我們的目標(biāo)并不是只創(chuàng)建任何數(shù)據(jù)存儲,我們需要的是具有強(qiáng)大一致性保證、網(wǎng)絡(luò)分區(qū)容錯性、節(jié)點(diǎn)故障耐久性以及數(shù)據(jù)更改通知的數(shù)據(jù)存儲—類似于etcd。真的有可能在10分鐘內(nèi)建立一個etcd克隆嗎?好吧,不,但我們可以驚人地關(guān)閉,建立一個數(shù)據(jù)存儲與相同的基本功能,更重要的是,相同的可靠性保證,在這段時間內(nèi)。
狀態(tài)機(jī)
構(gòu)建數(shù)據(jù)存儲的第一步是定義一個狀態(tài)機(jī)來包含數(shù)據(jù)存儲的狀態(tài)和邏輯。因?yàn)槲覀兊臄?shù)據(jù)存儲存儲的是鍵值對,所以我們將使用一個簡單的HashMap
將數(shù)據(jù)封裝到內(nèi)存中。說真的,哈希圖?線程安全怎么辦?耐用性如何?模仿者將為我們處理這些事情,因?yàn)槲覀兩院髮⒘私飧?。但首先,讓我們定義狀態(tài)機(jī):
public class KeyValueStore extends StateMachine {
private Mapstorage = new HashMap<>();
}
為了在狀態(tài)機(jī)上操作,我們需要聲明一些操作。Copycat支持兩種類型的操作:用于寫入的命令和用于讀取的查詢。讓我們從定義一些基本的etcd樣式操作開始:put
、get
和delete
:
public class Put implements Command{
public Object key;
public Object value;
public Put(Object key, Object value) {
this.key = key;
this.value = value;
}
}
public class Get implements Query{
public Object key;
public Get(Object key) {
this.key = key;
}
}
public class Delete implements Command{
public Object key;
public Delete(Object key) {
this.key = key;
}
}
定義了操作之后,讓我們在StateMachine
中實(shí)現(xiàn)對它們的處理:
public class KeyValueStore extends StateMachine {
private Mapstorage = new HashMap<>();
public Object put(Commitcommit) {
Commitput = storage.put(commit.operation().key, commit);
return put == null ? null : put.operation().value;
}
public Object get(Commitcommit) {
try {
Commitput = map.get(commit.operation().key);
return put == null ? null : put.operation().value;
} finally {
commit.release();
}
}
public Object delete(Commitcommit) {
Commitput = null;
try {
put = storage.remove(commit.operation().key);
return put == null ? null : put.operation().value;
} finally {
if (put != null)
put.release();
commit.release();
}
}
}
如您所見,put
、get
和delete
實(shí)現(xiàn)處理包含提交到狀態(tài)機(jī)的操作的對象。操作是在一個線程上執(zhí)行的,因此線程安全不是問題,在處理之后,操作返回一個反映機(jī)器內(nèi)部狀態(tài)的結(jié)果。
除了狀態(tài)機(jī)的存儲之外,Copycat還存儲狀態(tài)機(jī)處理的每個命令的內(nèi)部日志及其結(jié)果,用于故障處理和其他目的。定期對日志執(zhí)行壓縮,以便刪除不再需要的提交。為了幫助Copycat知道何時可以安全地從其日志中刪除提交,狀態(tài)機(jī)應(yīng)該release
不影響機(jī)器狀態(tài)的提交。例如,Put
操作只有在接收到同一按鍵的Delete
操作后才會釋放。另一方面,Get
操作會立即釋放,因?yàn)樗粫绊憴C(jī)器的狀態(tài)。
有了它,我們的基本鍵值存儲現(xiàn)在就實(shí)現(xiàn)了!稍后我們將添加一些更高級的操作,但現(xiàn)在讓我們準(zhǔn)備嘗試一下。
創(chuàng)建服務(wù)器
為了管理狀態(tài)機(jī),我們需要構(gòu)建一個CopycatServer
實(shí)例。服務(wù)器必須用一個地址初始化,以便在其上偵聽通信:
Address address = new Address("123.456.789.0", 5000);
CopycatServer.Builder builder = CopycatServer.builder(address);
我們將配置服務(wù)器以使用狀態(tài)機(jī):
builder.withStateMachine(KeyValueStore::new);
并為服務(wù)器配置一個Transport
,以便在與群集中的其他服務(wù)器進(jìn)行通信時使用:
builder.withTransport(NettyTransport.builder()
.withThreads(4)
.build());
我們將為狀態(tài)機(jī)的日志配置Storage
實(shí)現(xiàn),在本例中使用磁盤存儲:
builder.withStorage(Storage.builder()
.withDirectory(new File("logs"))
.withStorageLevel(StorageLevel.DISK)
.build());
最后我們將創(chuàng)建服務(wù)器:
CopycatServer server = builder.build();
引導(dǎo)群集
一旦建立了一個服務(wù)器,我們就可以使用它來引導(dǎo)一個新的集群:
server.bootstrap().thenAccept(srvr -> System.out.println(srvr + " has bootstrapped a cluster"));
此時,我們的狀態(tài)機(jī)已啟動并運(yùn)行,但讓我們將一些其他服務(wù)器加入群集:
Address clusterAddress = new Address("123.456.789.0", 5000);
server.join(clusterAddress).thenAccept(srvr -> System.out.println(srvr + " has joined the cluster"));
就這樣,我們創(chuàng)建了一個集群鍵值存儲!
執(zhí)行操作
為了將操作提交到數(shù)據(jù)存儲,我們需要創(chuàng)建一個CopycatClient
。我們將確保為客戶端配置與為服務(wù)器配置相同的Transport
:
CopycatClient client = CopycatClient.builder()
.withTransport(NettyTransport.builder()
.withThreads(2)
.build())
.build();
然后,我們將客戶端指向集群中的任何服務(wù)器,并connect
:
Address clusterAddress = new Address("123.456.789.0", 5000);
client.connect(clusterAddress).join();
我們的客戶端已連接,讓我們提交一個put
操作:
CompletableFuturefuture = client.submit(new Put("foo", "Hello world!"));
Object result = future.get();
我們也可以提交get
和delete
操作,提交方式與提交put
相同:
client.submit(new Get("foo")).thenAccept(result -> System.out.println("foo is: " + result));
client.submit(new Delete("foo")).thenRun(() -> System.out.println("foo has been deleted"));
從這里開始,我們可以將客戶機(jī)包裝在CLI或restapi中,以允許其他類型的訪問,但我們將把它作為練習(xí)留到下次使用。
實(shí)現(xiàn)一致性
現(xiàn)在我們已經(jīng)有了一個初始的系統(tǒng)并在運(yùn)行,讓我們退一步來討論一下在幕后發(fā)生了什么。請記住,在一開始我們就說過,僅僅構(gòu)建自己的鍵值存儲是不夠的,我們希望它能夠完全復(fù)制、持久、高度一致,并且能夠處理失敗。我們該怎么做?原來,我們已經(jīng)有了。
Copycat利用Raft一致性算法的復(fù)雜實(shí)現(xiàn)來確保以安全的方式將針對狀態(tài)機(jī)的每個操作復(fù)制到集群的每個成員。為了實(shí)現(xiàn)這一點(diǎn),集群中的每臺服務(wù)器都維護(hù)一個單獨(dú)的狀態(tài)機(jī)副本以及在狀態(tài)機(jī)上執(zhí)行的所有操作及其結(jié)果的日志。日志可以根據(jù)配置的StorageLevel
持久存儲,并用于在發(fā)生故障時恢復(fù)機(jī)器的狀態(tài)。
為了實(shí)現(xiàn)強(qiáng)一致性,Copycat利用多數(shù)仲裁來確保寫入操作在生效之前得到集群中大多數(shù)節(jié)點(diǎn)的批準(zhǔn)。如果網(wǎng)絡(luò)分區(qū)或系統(tǒng)出現(xiàn)故障,無法再實(shí)現(xiàn)仲裁,Copycat將停止處理寫操作,以防止發(fā)生數(shù)據(jù)不一致。
復(fù)制集群選擇一個領(lǐng)導(dǎo)者作為處理操作的焦點(diǎn)。當(dāng)客戶機(jī)向服務(wù)器提交一個命令時,它將被轉(zhuǎn)發(fā)給leader,leader將命令發(fā)送給集群的其余部分。然后,每個服務(wù)器將該命令應(yīng)用于其狀態(tài)機(jī),將結(jié)果附加到其日志中,并將響應(yīng)返回給領(lǐng)隊(duì)。一旦leader接收到來自集群大多數(shù)成員(包括它自己)的響應(yīng),它就會將命令應(yīng)用到自己的狀態(tài)機(jī),然后log將響應(yīng)發(fā)送回客戶機(jī)。
Copycat支持每個查詢操作的可配置一致性級別。當(dāng)客戶機(jī)向服務(wù)器提交查詢時,如果需要線性一致性,則可以將其轉(zhuǎn)發(fā)給leader;如果順序一致性足夠,則可以由任何服務(wù)器響應(yīng)。
實(shí)現(xiàn)容錯
Copycat利用心跳和超時來維護(hù)服務(wù)器之間的健康連接。如果一個領(lǐng)導(dǎo)者未能在配置的超時時間內(nèi)發(fā)出心跳信號,集群的其余成員將選擇一個新的領(lǐng)導(dǎo)者來協(xié)調(diào)操作的處理。同樣,如果跟隨者未能響應(yīng)心跳,則該服務(wù)器可能會從集群中刪除。
由于Copycat需要多數(shù)仲裁才能保持一致性并保持可用性,因此Copycat支持被動服務(wù)器和保留服務(wù)器,以便在發(fā)生故障時替換主動服務(wù)器。當(dāng)一個新服務(wù)器加入集群時,領(lǐng)導(dǎo)者將其日志流式傳輸?shù)椒?wù)器,然后服務(wù)器將記錄的操作應(yīng)用到其狀態(tài)機(jī)。一旦服務(wù)器被完全捕獲,領(lǐng)導(dǎo)者將把新服務(wù)器提升為集群的活動成員。
現(xiàn)在我們了解了一點(diǎn)Copycat是如何將我們的基本狀態(tài)機(jī)轉(zhuǎn)變成一個健壯的、分布式的鍵值存儲的,讓我們回到我們的實(shí)現(xiàn),并添加一些更高級的功能。
活下去的時間
etcd支持的一個很好的特性是按鍵的生存時間。這允許在特定時間段后自動刪除密鑰。讓我們將TTL支持添加到我們的數(shù)據(jù)存儲中。我們首先定義一個新的PutWithTtl
命令:
public class PutWithTtl implements Command{
public Object key;
public Object value;
public long ttl;
public PutWithTtl(Object key, Object value, long ttl) {
this.key = key;
this.value = value;
this.ttl = ttl;
}
@Override
public CompactionMode compaction() {
return CompactionMode.EXPIRING;
}
}
由于一個PutWithTtl
命令應(yīng)該會在一段時間后刪除狀態(tài),因此我們需要向Copycat指出這一點(diǎn),以便它可以從日志中正確壓縮這些提交。為此,我們提供了一個返回CompactionMode.EXPIRING
的compaction()
實(shí)現(xiàn)。
接下來,我們需要在狀態(tài)機(jī)中實(shí)現(xiàn)對PutWithTtl
命令的處理:
public Object putWithTtl(Commitcommit) {
Object result = storage.put(commit.operation().key, commit);
executor.schedule(Duration.ofMillis(commit.operation().ttl), () -> {
storage.remove(commit.operation().key);
commit.release();
});
return result;
}
在這里,我們計劃在超過TTL之后執(zhí)行一個將來的操作,這將從存儲中刪除commit并釋放它,類似于前面的delete實(shí)現(xiàn)。我們使用狀態(tài)機(jī)的內(nèi)部執(zhí)行器來安排條目刪除,因?yàn)檫@樣可以確保在狀態(tài)機(jī)內(nèi)部不會遇到任何線程安全問題。
看看會發(fā)生什么
隨著TTL的實(shí)現(xiàn),讓我們添加最后一個特性:觀察者。etcd和ZooKeeper中的觀察者允許客戶端在訪問密鑰時接收通知。這是實(shí)現(xiàn)各種協(xié)調(diào)模式的一個重要特性,但它通常帶有各種警告,包括嚴(yán)格的語義和較低的可靠性保證。
另一方面,Copycat提供了一種會話事件處理功能,允許從狀態(tài)機(jī)中的任何位置將任意數(shù)據(jù)直接發(fā)布到客戶端。這種靈活性使我們能夠輕松地建模復(fù)雜的分布式原語,如組、領(lǐng)導(dǎo)人選舉和消息傳遞,其中服務(wù)器端信息以高效和語義適當(dāng)?shù)姆绞桨l(fā)布到客戶端。會話事件保證在服務(wù)器發(fā)生故障時不會丟失,并且始終按順序傳遞。
為了利用數(shù)據(jù)存儲的會話事件,我們將首先定義一個新的Listen
命令,該命令將指示客戶端對從狀態(tài)機(jī)接收事件的興趣:
public class Listen implements Command{
}
接下來,我們將增強(qiáng)KeyValueStore
實(shí)現(xiàn)以處理Listen
命令:
public class KeyValueStore extends StateMachine {
private Mapstorage = new HashMap<>();
private Setlisteners = new HashSet<>();
public void listen(Commitcommit) {
listeners.add(commit);
}
listen
方法只存儲客戶端提交的commit,稍后我們將使用它將事件發(fā)布回客戶端。我們需要定義一個EntryEvent
類型來封裝我們的事件數(shù)據(jù):
public class EntryEventimplements Serializable {
public Object key;
public Object oldValue;
public Object newValue;
public EntryEvent(Object key, Object oldValue, Object newValue) {
this.key = key;
this.oldValue = oldValue;
this.newValue = newValue;
}
public String toString() {
return String.format("EntryEvent [key=%s, oldValue=%s, newValue=%s]", key, oldValue, newValue);
}
}
最后,我們將增強(qiáng)KeyValueStore
以使用與任何Listen
命令關(guān)聯(lián)的客戶端會話,從現(xiàn)有的命令處理程序中發(fā)布EntryEvent
:
private void publish(String event, Object key, Object oldValue, Object newValue) {
listeners.forEach(commit -> {
commit.session().publish(event, new EntryEvent(key, oldValue, newValue));
});
}
public Object put(Commitcommit) {
Commitput = storage.put(commit.operation().key, commit);
Object oldValue = put == null ? null : put.operation().value;
publish("put", commit.operation().key, oldValue, commit.operation().value);
return oldValue;
}
public Object putWithTtl(Commitcommit) {
Object result = storage.put(commit.operation().key, commit);
executor.schedule(Duration.ofMillis(commit.operation().ttl), () -> {
Commitput = storage.remove(commit.operation().key);
Object oldValue = put == null ? null : put.operation().value;
publish("expire", commit.operation().key, oldValue, null);
commit.release();
});
return result;
}
public Object delete(Commitcommit) {
Commitput = null;
try {
put = storage.remove(commit.operation().key);
Object oldValue = put == null ? null : put.operation().value;
publish("delete", commit.operation().key, oldValue, null);
return oldValue;
} finally {
if (put != null)
put.release();
commit.release();
}
}
在客戶端,我們將發(fā)布一個Listen
命令來表示我們對接收事件的興趣:
client.submit(new Listen()).thenRun(() -> LOG.info("Now listening for events")).join();
然后我們可以為特定事件注冊事件偵聽器:
client.onEvent("put", (EntryEvent event) -> System.out.println("Put: " + event));
client.onEvent("delete", (EntryEvent event) -> System.out.println("Delete: " + event));
client.onEvent("expire", (EntryEvent event) -> System.out.println("Expire: " + event));
現(xiàn)在,當(dāng)數(shù)據(jù)存儲中發(fā)生狀態(tài)更改時,將通知客戶機(jī)。
總結(jié)
好吧,就這樣。我們的10分鐘結(jié)束了,在Copycat的幫助下,我們從頭開始創(chuàng)建了一個生產(chǎn)就緒、高度一致的集群鍵值存儲。我們還學(xué)習(xí)了一些關(guān)于分布式系統(tǒng)中的一致性和容錯性的知識,希望現(xiàn)在我們可以用我們的新知識創(chuàng)建一些其他的東西。
Copycat和它的姐妹項(xiàng)目Atomix的目標(biāo)并不是要構(gòu)建一個像etcd這樣的特定技術(shù)的克隆,就像現(xiàn)在看起來的那樣。其目標(biāo)是授權(quán)用戶構(gòu)建適合自己需要的系統(tǒng)。
模仿可以讓我們比以前更快、更安全、更容易地構(gòu)建復(fù)雜的系統(tǒng)。那么,既然你已經(jīng)看到了它能做什么,你將建立什么?