// Package utils provides utilities that are needed for the functioning of Kerma package utils import ( "bufio" "errors" "kerma/helpers" "kerma/models" "time" "github.com/docker/go/canonical/json" ) /* A Server represents a Kerma server with the following properties: - State - the current state of the application - Handler - a TCPHandler that is used to handle requests to and from remote peers - Logger - Client - a new client that uses the same state and handler as this server */ type Server struct { State *models.State Handler TCPHandler Logger helpers.Logger Client Client } /* Construct creates a new server with the specified state and handler objects */ func (s *Server) Construct(state *models.State, h TCPHandler) { s.State = state s.Handler = h s.Client.Construct(s.State, s.Handler) } /* Handle is responsible for handling requests from other Kerma nodes/clients */ func (s *Server) Handle() { // Decode message received from reader, fail on error scanner := bufio.NewScanner(s.Handler.Conn) for scanner.Scan() { msg, err := s.Handler.Input(scanner.Bytes()) if err != nil { s.Handler.Fail(err.Error()) } // Parse received message, fail on error msgJSON, err := json.MarshalCanonical(msg) if err != nil { s.Logger.Error("Failed parsing message " + err.Error()) s.Handler.Fail("Failed parsing message") } s.Logger.Info("IN: " + s.Handler.GetRemotePeer() + " " + string(msgJSON)) remotePeer := s.Handler.GetRemotePeer() // check message type, proceed accordingly if msgType, ok := msg["type"]; ok { switch msgType { case "error": break case "hello": // check if peer has already handshaked if !s.State.CheckForHandshake(remotePeer) { // check if hello request is valid var helloReq models.Hello err := helloReq.UnmarshalJSON(msgJSON) if err == nil { // finish handshake s.Logger.Info("Constructing handshake with " + remotePeer) var resp models.Hello resp.Construct() respJSON, _ := resp.MarshalJson() s.Handler.Output(respJSON) s.State.CompleteHandshake(remotePeer) s.Client.DiscoverPeers(remotePeer) } else { s.Handler.Fail("Hello request invalid") } } break case "getpeers": err := s.terminateIfNoHandshake(remotePeer) if err != nil { break } // create list of known peers, send to requester resp := s.State.FetchPeerListResponse() resp.Peers = append(resp.Peers, s.Handler.GetLocalPeer()) respJSON, _ := resp.MarshalJson() s.Handler.Output(respJSON) break case "peers": err := s.terminateIfNoHandshake(remotePeer) if err != nil { break } var peers models.PeerListResponse json.Unmarshal(msgJSON, &peers) s.State.ParsePeerListResponse(&peers) break case "ihaveobject": err := s.terminateIfNoHandshake(remotePeer) if err != nil { break } var obj models.ObjectWrapper err = obj.UnmarshalJSON(msgJSON) if err != nil { s.Handler.Fail("Could not parse request") break } if s.State.Transactions[obj.ObjectID] == nil { resp := models.ObjectWrapper{ Type: "getobject", ObjectID: obj.ObjectID, } respJSON, _ := resp.MarshalJson() s.Handler.Output(respJSON) } case "getobject": err = s.terminateIfNoHandshake(remotePeer) if err != nil { break } var obj models.ObjectWrapper err := obj.UnmarshalJSON(msgJSON) if err != nil { s.Handler.Fail("Could not parse request") break } transaction := s.State.GetTransaction(obj.ObjectID) if transaction != nil { s.Logger.Debug("Found transaction: " + obj.ObjectID) resp := json.RawMessage(`{"type":"object","object":` + transaction.String() + `}`) respJSON, _ := resp.MarshalJSON() s.Handler.Output(respJSON) } block := s.State.GetBlock(obj.ObjectID) if block != nil { s.Logger.Debug("Found block: " + obj.ObjectID) resp := json.RawMessage(`{"type":"object","object":` + block.String() + `}`) respJSON, _ := resp.MarshalJSON() s.Handler.Output(respJSON) } else { s.Handler.Error("Could not find object " + obj.ObjectID) } break case "object": err = s.terminateIfNoHandshake(remotePeer) if err != nil { break } var object models.Object err := object.UnmarshalJSON(msgJSON) if err != nil { s.Logger.Debug("Could not unmarshal: " + err.Error()) s.Handler.Fail("Object request invalid") break } val, err := object.GetObjectValue() if err != nil { s.Logger.Error("Could not parse object: " + err.Error()) s.Handler.Fail("Could not parse object request") break } switch val.GetType() { case "transaction": err := s.State.AppendTransaction(val.GetEntity().(*models.Transaction)) if err != nil { s.Logger.Debug("Failed appending transaction: " + err.Error()) s.Handler.Fail("Error appending transaction") break } break case "block": block := val.(*models.Block) go s.appendBlockToChain(block) break default: s.Logger.Debug("Something went wrong with " + string(msgJSON)) s.Handler.Fail("Could not parse object request") } break case "getchaintip": err := s.terminateIfNoHandshake(remotePeer) if err != nil { break } res := s.State.GetChainTip() var chaintip models.Chaintip chaintip.Construct(res.GetID()) respJSON, _ := chaintip.MarshalJson() s.Handler.Output(respJSON) break case "chaintip": err := s.terminateIfNoHandshake(remotePeer) if err != nil { break } var chaintip models.Chaintip err = chaintip.UnmarshalJSON(msgJSON) if err != nil { s.Logger.Debug("Could not unmarshal: " + err.Error()) s.Handler.Fail("Object request invalid") break } res := s.State.GetChainTip() if res.GetID() != chaintip.BlockID { s.Client.DiscoverObject(remotePeer, chaintip.BlockID) } break case "getmempool": err := s.terminateIfNoHandshake(remotePeer) if err != nil { break } res := s.State.GetMempoolTransactionIDs() var mempool models.Mempool mempool.Construct(res) respJSON, _ := mempool.MarshalJson() s.Handler.Output(respJSON) break case "mempool": err := s.terminateIfNoHandshake(remotePeer) if err != nil { break } var mempool models.Mempool err = mempool.UnmarshalJSON(msgJSON) if err != nil { s.Logger.Debug("Could not unmarshal: " + err.Error()) s.Handler.Fail("Object request invalid") break } for _, txid := range mempool.Txids { res := s.State.GetTransaction(txid) if res == nil { s.Client.DiscoverObject(remotePeer, txid) } } break default: s.Handler.Fail("Request type not supported " + msgType.(string)) } } else { s.Handler.Fail("Request type not specified") } } } func (s *Server) terminateIfNoHandshake(remotePeer string) error { if !s.State.CheckForHandshake(remotePeer) { s.Handler.Fail("Handshake not completed") return errors.New("handshake not completed with peer " + remotePeer) } return nil } func (s *Server) appendBlockToChain(block *models.Block) { dontAppend := false missingTx, err := s.State.GetMissingTransactionsInBlock(block) if err != nil { s.Logger.Debug("Failed appending block: " + err.Error()) s.Handler.Fail("Error appending block") return } if len(missingTx) != 0 { for _, txid := range missingTx { s.Client.DiscoverObject(s.Handler.GetRemotePeer(), txid) waited := 0 for s.State.GetTransaction(txid) == nil { s.Logger.Debug("Waiting for transactions " + txid) time.Sleep(500 * time.Millisecond) waited += 500 if waited*int(time.Millisecond) >= s.State.Config.ConnTimeout*int(time.Second) { s.Handler.Fail("Did not receive requested transactions in time, failed appending block") dontAppend = true return } } } } s.getMissingBlocks(block) if !dontAppend { err = s.State.AppendToChain(block) if err != nil { s.Logger.Debug("Failed appending block: " + err.Error()) s.Handler.Fail("Error appending block") } s.State.DumpBlockStore() for _, peer := range s.State.PeerList { if peer.Active { s.Client.GossipBlocks(peer.Name) } } } } func (s *Server) getMissingBlocks(block *models.Block) { blockId := *block.Previd if s.State.GetBlock(blockId) != nil { return } s.Logger.Debug("Block missing from chain: " + blockId) for _, peer := range s.State.PeerList { s.Client.DiscoverObject(peer.Name, blockId) } for s.State.GetBlock(blockId) == nil { s.Logger.Debug("Waiting for block " + blockId) time.Sleep(500 * time.Millisecond) } }