tu-wien
/
kerma
Archived
2
0
Fork 0
This repository has been archived on 2023-03-02. You can view files and clone it, but cannot push or open issues or pull requests.
kerma/utils/server.go

353 lines
8.6 KiB
Go

// Package utils provides utilities that are needed for the functioning of Kerma
package utils
import (
"bufio"
"errors"
"kerma/helpers"
"kerma/models"
"time"
"github.com/docker/go/canonical/json"
)
/*
A Server represents a Kerma server with the following properties:
- State - the current state of the application
- Handler - a TCPHandler that is used to handle requests to and from remote peers
- Logger
- Client - a new client that uses the same state and handler as this server
*/
type Server struct {
State *models.State
Handler TCPHandler
Logger helpers.Logger
Client Client
}
/*
Construct creates a new server with the specified state and handler objects
*/
func (s *Server) Construct(state *models.State, h TCPHandler) {
s.State = state
s.Handler = h
s.Client.Construct(s.State, s.Handler)
}
/*
Handle is responsible for handling requests from other Kerma nodes/clients
*/
func (s *Server) Handle() {
// Decode message received from reader, fail on error
scanner := bufio.NewScanner(s.Handler.Conn)
for scanner.Scan() {
msg, err := s.Handler.Input(scanner.Bytes())
if err != nil {
s.Handler.Fail(err.Error())
}
// Parse received message, fail on error
msgJSON, err := json.MarshalCanonical(msg)
if err != nil {
s.Logger.Error("Failed parsing message " + err.Error())
s.Handler.Fail("Failed parsing message")
}
s.Logger.Info("IN: " + s.Handler.GetRemotePeer() + " " + string(msgJSON))
remotePeer := s.Handler.GetRemotePeer()
// check message type, proceed accordingly
if msgType, ok := msg["type"]; ok {
switch msgType {
case "error":
break
case "hello":
// check if peer has already handshaked
if !s.State.CheckForHandshake(remotePeer) {
// check if hello request is valid
var helloReq models.Hello
err := helloReq.UnmarshalJSON(msgJSON)
if err == nil {
// finish handshake
s.Logger.Info("Constructing handshake with " + remotePeer)
var resp models.Hello
resp.Construct()
respJSON, _ := resp.MarshalJson()
s.Handler.Output(respJSON)
s.State.CompleteHandshake(remotePeer)
s.Client.DiscoverPeers(remotePeer)
} else {
s.Handler.Fail("Hello request invalid")
}
}
break
case "getpeers":
err := s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
// create list of known peers, send to requester
resp := s.State.FetchPeerListResponse()
resp.Peers = append(resp.Peers, s.Handler.GetLocalPeer())
respJSON, _ := resp.MarshalJson()
s.Handler.Output(respJSON)
break
case "peers":
err := s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
var peers models.PeerListResponse
json.Unmarshal(msgJSON, &peers)
s.State.ParsePeerListResponse(&peers)
break
case "ihaveobject":
err := s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
var obj models.ObjectWrapper
err = obj.UnmarshalJSON(msgJSON)
if err != nil {
s.Handler.Fail("Could not parse request")
break
}
if s.State.Transactions[obj.ObjectID] == nil {
resp := models.ObjectWrapper{
Type: "getobject",
ObjectID: obj.ObjectID,
}
respJSON, _ := resp.MarshalJson()
s.Handler.Output(respJSON)
}
case "getobject":
err = s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
var obj models.ObjectWrapper
err := obj.UnmarshalJSON(msgJSON)
if err != nil {
s.Handler.Fail("Could not parse request")
break
}
transaction := s.State.GetTransaction(obj.ObjectID)
if transaction != nil {
s.Logger.Debug("Found transaction: " + obj.ObjectID)
resp := json.RawMessage(`{"type":"object","object":` + transaction.String() + `}`)
respJSON, _ := resp.MarshalJSON()
s.Handler.Output(respJSON)
}
block := s.State.GetBlock(obj.ObjectID)
if block != nil {
s.Logger.Debug("Found block: " + obj.ObjectID)
resp := json.RawMessage(`{"type":"object","object":` + block.String() + `}`)
respJSON, _ := resp.MarshalJSON()
s.Handler.Output(respJSON)
} else {
s.Handler.Error("Could not find object " + obj.ObjectID)
}
break
case "object":
err = s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
var object models.Object
err := object.UnmarshalJSON(msgJSON)
if err != nil {
s.Logger.Debug("Could not unmarshal: " + err.Error())
s.Handler.Fail("Object request invalid")
break
}
val, err := object.GetObjectValue()
if err != nil {
s.Logger.Error("Could not parse object: " + err.Error())
s.Handler.Fail("Could not parse object request")
break
}
switch val.GetType() {
case "transaction":
err := s.State.AppendTransaction(val.GetEntity().(*models.Transaction))
if err != nil {
s.Logger.Debug("Failed appending transaction: " + err.Error())
s.Handler.Fail("Error appending transaction")
break
}
break
case "block":
block := val.(*models.Block)
go s.appendBlockToChain(block)
break
default:
s.Logger.Debug("Something went wrong with " + string(msgJSON))
s.Handler.Fail("Could not parse object request")
}
break
case "getchaintip":
err := s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
res := s.State.GetChainTip()
var chaintip models.Chaintip
chaintip.Construct(res.GetID())
respJSON, _ := chaintip.MarshalJson()
s.Handler.Output(respJSON)
break
case "chaintip":
err := s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
var chaintip models.Chaintip
err = chaintip.UnmarshalJSON(msgJSON)
if err != nil {
s.Logger.Debug("Could not unmarshal: " + err.Error())
s.Handler.Fail("Object request invalid")
break
}
res := s.State.GetChainTip()
if res.GetID() != chaintip.BlockID {
s.Client.DiscoverObject(remotePeer, chaintip.BlockID)
}
break
case "getmempool":
err := s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
res := s.State.GetMempoolTransactionIDs()
var mempool models.Mempool
mempool.Construct(res)
respJSON, _ := mempool.MarshalJson()
s.Handler.Output(respJSON)
break
case "mempool":
err := s.terminateIfNoHandshake(remotePeer)
if err != nil {
break
}
var mempool models.Mempool
err = mempool.UnmarshalJSON(msgJSON)
if err != nil {
s.Logger.Debug("Could not unmarshal: " + err.Error())
s.Handler.Fail("Object request invalid")
break
}
for _, txid := range mempool.Txids {
res := s.State.GetTransaction(txid)
if res == nil {
s.Client.DiscoverObject(remotePeer, txid)
}
}
break
default:
s.Handler.Fail("Request type not supported " + msgType.(string))
}
} else {
s.Handler.Fail("Request type not specified")
}
}
}
func (s *Server) terminateIfNoHandshake(remotePeer string) error {
if !s.State.CheckForHandshake(remotePeer) {
s.Handler.Fail("Handshake not completed")
return errors.New("handshake not completed with peer " + remotePeer)
}
return nil
}
func (s *Server) appendBlockToChain(block *models.Block) {
dontAppend := false
missingTx, err := s.State.GetMissingTransactionsInBlock(block)
if err != nil {
s.Logger.Debug("Failed appending block: " + err.Error())
s.Handler.Fail("Error appending block")
return
}
if len(missingTx) != 0 {
for _, txid := range missingTx {
s.Client.DiscoverObject(s.Handler.GetRemotePeer(), txid)
waited := 0
for s.State.GetTransaction(txid) == nil {
s.Logger.Debug("Waiting for transactions " + txid)
time.Sleep(500 * time.Millisecond)
waited += 500
if waited*int(time.Millisecond) >= s.State.Config.ConnTimeout*int(time.Second) {
s.Handler.Fail("Did not receive requested transactions in time, failed appending block")
dontAppend = true
return
}
}
}
}
s.getMissingBlocks(block)
if !dontAppend {
err = s.State.AppendToChain(block)
if err != nil {
s.Logger.Debug("Failed appending block: " + err.Error())
s.Handler.Fail("Error appending block")
}
s.State.DumpBlockStore()
for _, peer := range s.State.PeerList {
if peer.Active {
s.Client.GossipBlocks(peer.Name)
}
}
}
}
func (s *Server) getMissingBlocks(block *models.Block) {
blockId := *block.Previd
if s.State.GetBlock(blockId) != nil {
return
}
s.Logger.Debug("Block missing from chain: " + blockId)
for _, peer := range s.State.PeerList {
s.Client.DiscoverObject(peer.Name, blockId)
}
for s.State.GetBlock(blockId) == nil {
s.Logger.Debug("Waiting for block " + blockId)
time.Sleep(500 * time.Millisecond)
}
}