软件世界网 购物 网址 三丰软件 | 小说 美女秀 图库大全 游戏 笑话 | 下载 开发知识库 新闻 开发 图片素材
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
移动开发 架构设计 编程语言 Web前端 互联网
开发杂谈 系统运维 研发管理 数据库 云计算 Android开发资料
  软件世界网 -> 研发管理 -> IBMopenblockchain学习(三) -> 正文阅读
研发管理 最新文章
拉格朗日乘数
maven之可视化项目依赖(Visualizingdepend
mac效率工具
Atitit.css规范bem项目中CSS的组织和管理
git入门
Asimplemodelfordescribingbasicsourcesofp
Linux进程管理浅析
我的openwrt学习笔记(十九):linux便捷开
2、微控制器选择
Git使用手册:为Git仓库创建Submodule

[研发管理]IBMopenblockchain学习(三)

  2016-04-02 21:01:20

Ledger是总账簿的意思,也就是blockchain中存储交易记录的部分。其代码包含如下,这块代码量大,可能分析时间会很长,希望读者耐心等待。

blockchain


先看下Blockchain在内存中保存的基本信息,Blockchain中的操作不是线程安全的
type blockchain struct {
    size               uint64  //块大小
    previousBlockHash  []byte   //上一个块的哈希
    indexer            blockchainIndexer //块索引
    lastProcessedBlock *lastProcessedBlock  //最后处理的块
}

最后处理的块的结构
type lastProcessedBlock struct {
    block       *protos.Block
    blockNumber uint64  块号
    blockHash   []byte 块哈希值
}

newBlockchain


newBlockchain()用于创建一个区块
func newBlockchain() (*blockchain, error) {
    size, err := fetchBlockchainSizeFromDB()//从数据库中读取blockchain的大小
    if err != nil {
        return nil, err
    }
    blockchain := &blockchain{0, nil, nil, nil}
    blockchain.size = size
    if size > 0 {
        previousBlock, err := fetchBlockFromDB(size - 1)
        //如果为创世区块,则上一个块是创世区块的大小减一
        if err != nil {
            return nil, err
        }
        previousBlockHash, err := previousBlock.GetHash()
        //获取前驱块的哈希
        if err != nil {
            return nil, err
        }
        blockchain.previousBlockHash = previousBlockHash
    }

    err = blockchain.startIndexer()
    //开始创建索引
    if err != nil {
        return nil, err
    }
    return blockchain, nil
}

startIndexer()


创建索引
func (blockchain *blockchain) startIndexer() (err error) {
    if indexBlockDataSynchronously {
        blockchain.indexer = newBlockchainIndexerSync()
        //同步创建区块链索引
    } else {
        blockchain.indexer = newBlockchainIndexerAsync()
    }
    err = blockchain.indexer.start(blockchain)
    return
}

getLastBlock


getLastBlock创建最后区块
func (blockchain *blockchain) getLastBlock() (*protos.Block, error) {
    if blockchain.size == 0 {
        return nil, nil
    }
    return blockchain.getBlock(blockchain.size - 1)
}

getSize


getSize用于返回块大小
func (blockchain *blockchain) getSize() uint64 {
    return blockchain.size
}

getBlock


在blockchain中通过任意高度获取块

func (blockchain *blockchain) getBlock(blockNumber uint64) (*protos.Block, error) {
    return fetchBlockFromDB(blockNumber)
}

getBlockByHash


通过块的哈希获取块
func (blockchain *blockchain) getBlockByHash(blockHash []byte) (*protos.Block, error) {
    blockNumber, err := blockchain.indexer.fetchBlockNumberByBlockHash(blockHash)
    if err != nil {
        return nil, err
    }
    return blockchain.getBlock(blockNumber)
}

getTransactionByUUID


通过UUID获取交易记录
func (blockchain *blockchain) getTransactionByUUID(txUUID string) (*protos.Transaction, error) {
    blockNumber, txIndex, err := blockchain.indexer.fetchTransactionIndexByUUID(txUUID)
    if err != nil {
        return nil, err
    }
    block, err := blockchain.getBlock(blockNumber)
    if err != nil {
        return nil, err
    }
    transaction := block.GetTransactions()[txIndex]
    return transaction, nil
}

getTransactions


通过有块号标识的块获取所有的交易
func (blockchain *blockchain) getTransactions(blockNumber uint64) ([]*protos.Transaction, error) {
    block, err := blockchain.getBlock(blockNumber)
    if err != nil {
        return nil, err
    }
    return block.GetTransactions(), nil
}

getTransactionsByBlockHash


通过块的哈希获取所有的交易
func (blockchain *blockchain) getTransactionsByBlockHash(blockHash []byte) ([]*protos.Transaction, error) {
    block, err := blockchain.getBlockByHash(blockHash)
    if err != nil {
        return nil, err
    }
    return block.GetTransactions(), nil
}

getTransaction


通过数块和确定块内索引获取交易
func (blockchain *blockchain) getTransaction(blockNumber uint64, txIndex uint64) (*protos.Transaction, error) {
    block, err := blockchain.getBlock(blockNumber)
    if err != nil {
        return nil, err
    }
    return block.GetTransactions()[txIndex], nil
}

getTransactionByBlockHash


通过块内块的哈希和标识索引获取交易
func (blockchain *blockchain) getTransactionByBlockHash(blockHash []byte, txIndex uint64) (*protos.Transaction, error) {
    block, err := blockchain.getBlockByHash(blockHash)
    if err != nil {
        return nil, err
    }
    return block.GetTransactions()[txIndex], nil
}

getBlockchainInfo


获取区块链的信息
func (blockchain *blockchain) getBlockchainInfo() (*protos.BlockchainInfo, error) {
    if blockchain.getSize() == 0 {
        return &protos.BlockchainInfo{Height: 0}, nil
    }

    lastBlock, err := blockchain.getLastBlock()
    if err != nil {
        return nil, err
    }

    info := &protos.BlockchainInfo{
        Height:            blockchain.getSize(),
        CurrentBlockHash:  blockchain.previousBlockHash,
        PreviousBlockHash: lastBlock.PreviousBlockHash}

    return info, nil
}

buildBlock


创建块
func (blockchain *blockchain) buildBlock(block *protos.Block, stateHash []byte) *protos.Block {
    block.SetPreviousBlockHash(blockchain.previousBlockHash)
    block.StateHash = stateHash
    return block
}

addPersistenceChangesForNewBlock


对于新块添加持久性的更改
func (blockchain *blockchain) addPersistenceChangesForNewBlock(ctx context.Context,
    block *protos.Block, stateHash []byte, writeBatch *gorocksdb.WriteBatch) (uint64, error) {
    block = blockchain.buildBlock(block, stateHash)
    if block.NonHashData == nil {
        block.NonHashData = &protos.NonHashData{LocalLedgerCommitTimestamp: util.CreateUtcTimestamp()}
    } else {
        block.NonHashData.LocalLedgerCommitTimestamp = util.CreateUtcTimestamp()
    }
    blockNumber := blockchain.size
    blockHash, err := block.GetHash()
    if err != nil {
        return 0, err
    }
    blockBytes, blockBytesErr := block.Bytes()
    if blockBytesErr != nil {
        return 0, blockBytesErr
    }
    writeBatch.PutCF(db.GetDBHandle().BlockchainCF, encodeBlockNumberDBKey(blockNumber), blockBytes)
    writeBatch.PutCF(db.GetDBHandle().BlockchainCF, blockCountKey, encodeUint64(blockNumber+1))
    if blockchain.indexer.isSynchronous() {
        blockchain.indexer.createIndexesSync(block, blockNumber, blockHash, writeBatch)
    }
    blockchain.lastProcessedBlock = &lastProcessedBlock{block, blockNumber, blockHash}
    return blockNumber, nil
}

blockPersistenceStatus


块持久状态
func (blockchain *blockchain) blockPersistenceStatus(success bool) {
    if success {
        blockchain.size++
        blockchain.previousBlockHash = blockchain.lastProcessedBlock.blockHash
        if !blockchain.indexer.isSynchronous() {
            blockchain.indexer.createIndexesAsync(blockchain.lastProcessedBlock.block,
                blockchain.lastProcessedBlock.blockNumber, blockchain.lastProcessedBlock.blockHash)
        }
    }
    blockchain.lastProcessedBlock = nil
}

persistRawBlock


持久化原始块
func (blockchain *blockchain) persistRawBlock(block *protos.Block, blockNumber uint64) error {
    blockBytes, blockBytesErr := block.Bytes()
    if blockBytesErr != nil {
        return blockBytesErr
    }
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    writeBatch.PutCF(db.GetDBHandle().BlockchainCF, encodeBlockNumberDBKey(blockNumber), blockBytes)

    // 它需要检查,因为我们在这样的情况下块/状态同步支持乱序块。其真正意义区块链的高度,而不是规模。
    if blockchain.getSize() < blockNumber+1 {
        sizeBytes := encodeUint64(blockNumber + 1)
        writeBatch.PutCF(db.GetDBHandle().BlockchainCF, blockCountKey, sizeBytes)
        blockchain.size = blockNumber + 1
    }
    blockHash, err := block.GetHash()
    if err != nil {
        return err
    }

    if blockchain.indexer.isSynchronous() {
        blockchain.indexer.createIndexesSync(block, blockNumber, blockHash, writeBatch)
    }

    opt := gorocksdb.NewDefaultWriteOptions()
    defer opt.Destroy()
    err = db.GetDBHandle().DB.Write(opt, writeBatch)
    if err != nil {
        return err
    }
    return nil
}

fetchBlockFromDB


从数据库中获取块
func fetchBlockFromDB(blockNumber uint64) (*protos.Block, error) {
    blockBytes, err := db.GetDBHandle().GetFromBlockchainCF(encodeBlockNumberDBKey(blockNumber))
    if err != nil {
        return nil, err
    }
    if blockBytes == nil {
        return nil, nil
    }
    return protos.UnmarshallBlock(blockBytes)
}

fetchTransactionFromDB


从数据库中获取交易记录
func fetchTransactionFromDB(blockNum uint64, txIndex uint64) (*protos.Transaction, error) {
    block, err := fetchBlockFromDB(blockNum)
    if err != nil {
        return nil, err
    }
    return block.GetTransactions()[txIndex], nil
}

fetchBlockchainSizeFromDB


从数据库中获取区块链的大小
func fetchBlockchainSizeFromDB() (uint64, error) {
    bytes, err := db.GetDBHandle().GetFromBlockchainCF(blockCountKey)
    if err != nil {
        return 0, err
    }
    if bytes == nil {
        return 0, nil
    }
    return decodeToUint64(bytes), nil
}

fetchBlockchainSizeFromSnapshot


从快照中获取区块链大小
func fetchBlockchainSizeFromSnapshot(snapshot *gorocksdb.Snapshot) (uint64, error) {
    blockNumberBytes, err := db.GetDBHandle().GetFromBlockchainCFSnapshot(snapshot, blockCountKey)
    if err != nil {
        return 0, err
    }
    var blockNumber uint64
    if blockNumberBytes != nil {
        blockNumber = decodeToUint64(blockNumberBytes)
    }
    return blockNumber, nil
}

String


将区块链的信息以字符串形式输出
func (blockchain *blockchain) String() string {
    var buffer bytes.Buffer
    size := blockchain.getSize()
    for i := uint64(0); i < size; i++ {
        block, blockErr := blockchain.getBlock(i)
        if blockErr != nil {
            return ""
        }
        buffer.WriteString("\n----------<block #")
        buffer.WriteString(strconv.FormatUint(i, 10))
        buffer.WriteString(">----------\n")
        buffer.WriteString(block.String())
        buffer.WriteString("\n----------<\\block #")
        buffer.WriteString(strconv.FormatUint(i, 10))
        buffer.WriteString(">----------\n")
    }
    return buffer.String()
}

blockchain_indexes


blockchainIndexer定义了以下几个接口
type blockchainIndexer interface {
    //同步标识
    isSynchronous() bool
    //开始创建
    start(blockchain *blockchain) error
    //同步创建索引
    createIndexesSync(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error
    //异步创建索引
    createIndexesAsync(block *protos.Block, blockNumber uint64, blockHash []byte) error
    //通过块哈希获取块号
    fetchBlockNumberByBlockHash(blockHash []byte) (uint64, error)
    //通过UUID获取块号
    fetchTransactionIndexByUUID(txUUID string) (uint64, uint64, error)
    //停止创建
    stop()
}

addIndexDataForPersistence


持久化并且检索索引数据
func addIndexDataForPersistence(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error {
    openchainDB := db.GetDBHandle()
    cf := openchainDB.IndexesCF

    // 块号映射成块哈希值
    indexLogger.Debug("Indexing block number [%d] by hash = [%x]", blockNumber, blockHash)
    writeBatch.PutCF(cf, encodeBlockHashKey(blockHash), encodeBlockNumber(blockNumber))

    addressToTxIndexesMap := make(map[string][]uint64)
    addressToChaincodeIDsMap := make(map[string][]*protos.ChaincodeID)

    transactions := block.GetTransactions()
    for txIndex, tx := range transactions {
        // 添加TXT UUID - >(块号,索引中块)
        writeBatch.PutCF(cf, encodeTxUUIDKey(tx.Uuid), encodeBlockNumTxIndex(blockNumber, uint64(txIndex)))

        txExecutingAddress := getTxExecutingAddress(tx)
        addressToTxIndexesMap[txExecutingAddress] = append(addressToTxIndexesMap[txExecutingAddress], uint64(txIndex))

        switch tx.Type {
        case protos.Transaction_CHAINCODE_NEW, protos.Transaction_CHAINCODE_UPDATE:
            authroizedAddresses, chaincodeID := getAuthorisedAddresses(tx)
            for _, authroizedAddress := range authroizedAddresses {
                addressToChaincodeIDsMap[authroizedAddress] = append(addressToChaincodeIDsMap[authroizedAddress], chaincodeID)
            }
        }
    }
    for address, txsIndexes := range addressToTxIndexesMap {
        writeBatch.PutCF(cf, encodeAddressBlockNumCompositeKey(address, blockNumber), encodeListTxIndexes(txsIndexes))
    }
    return nil
}

getAuthorisedAddresses


获得授权地址
func getAuthorisedAddresses(tx *protos.Transaction) ([]string, *protos.ChaincodeID) {
    // 从chaincode的部署TX中获取取地址
    // 这个方法也会返回错误
    data := tx.ChaincodeID
    cID := &protos.ChaincodeID{}
    err := proto.Unmarshal(data, cID)
    if err != nil {
        return nil, nil
    }
    return []string{"address1", "address2"}, cID
}

encodeBlockNumber


编码/解码数据库键/值函数,索引数据编码/解码块数

func encodeBlockNumber(blockNumber uint64) []byte {
    return proto.EncodeVarint(blockNumber)
}
func decodeBlockNumber(blockNumberBytes []byte) (blockNumber uint64) {
    blockNumber, _ = proto.DecodeVarint(blockNumberBytes)
    return
}

encodeBlockNumTxIndex


对 块号的Tx索引进行编码/解码
func encodeBlockNumTxIndex(blockNumber uint64, txIndexInBlock uint64) []byte {
    b := proto.NewBuffer([]byte{})
    b.EncodeVarint(blockNumber)
    b.EncodeVarint(txIndexInBlock)
    return b.Bytes()
}

func decodeBlockNumTxIndex(bytes []byte) (blockNum uint64, txIndex uint64, err error) {
    b := proto.NewBuffer(bytes)
    blockNum, err = b.DecodeVarint()
    if err != nil {
        return
    }
    txIndex, err = b.DecodeVarint()
    if err != nil {
        return
    }
    return
}

对区块哈希的键值进行编码

func encodeBlockHashKey(blockHash []byte) []byte {
    return prependKeyPrefix(prefixBlockHashKey, blockHash)
}

对TxUUID的键值进行编码
func encodeTxUUIDKey(txUUID string) []byte {
    return prependKeyPrefix(prefixTxUUIDKey, []byte(txUUID))
}

对区块号地址的复合键值进行编码
func encodeAddressBlockNumCompositeKey(address string, blockNumber uint64) []byte {
    b := proto.NewBuffer([]byte{prefixAddressBlockNumCompositeKey})
    b.EncodeRawBytes([]byte(address))
    b.EncodeVarint(blockNumber)
    return b.Bytes()
}

对Tx的索引清单进行编码
func encodeListTxIndexes(listTx []uint64) []byte {
    b := proto.NewBuffer([]byte{})
    for i := range listTx {
        b.EncodeVarint(listTx[i])
    }
    return b.Bytes()
}

对chaincode的ID进行编码
func encodeChaincodeID(c *protos.ChaincodeID) []byte {
    // 序列化chaincode ID
    return []byte{}
}

前置键值前缀
func prependKeyPrefix(prefix byte, key []byte) []byte {
    modifiedKey := []byte{}
    modifiedKey = append(modifiedKey, prefix)
    modifiedKey = append(modifiedKey, key...)
    return modifiedKey
}

blockchain_indexes_async


整个代码主要执行对blockchain的异步创建索引
type blockchainIndexerAsync struct {
    blockchain *blockchain
    //从块链转移块索引的通道
    blockChan    chan blockWrapper
    indexerState *blockchainIndexerState
}

createIndexesInternal


创建索引条目并逐步添加到数据库,用于创建各种属性的索引
func (indexer *blockchainIndexerAsync) createIndexesInternal(block *protos.Block, blockNumber uint64, blockHash []byte) error {
    openchainDB := db.GetDBHandle()
    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    addIndexDataForPersistence(block, blockNumber, blockHash, writeBatch)
    writeBatch.PutCF(openchainDB.IndexesCF, lastIndexedBlockKey, encodeBlockNumber(blockNumber))
    opt := gorocksdb.NewDefaultWriteOptions()
    defer opt.Destroy()
    err := openchainDB.DB.Write(opt, writeBatch)
    if err != nil {
        return err
    }
    indexer.indexerState.blockIndexed(blockNumber)
    return nil
}

indexPendingBlocks


待定块的索引
func (indexer *blockchainIndexerAsync) indexPendingBlocks() error {
    blockchain := indexer.blockchain
    if blockchain.getSize() == 0 {
        // 链至今为空
        return nil
    }

    lastCommittedBlockNum := blockchain.getSize() - 1
    lastIndexedBlockNum := indexer.indexerState.getLastIndexedBlockNumber()
    if lastCommittedBlockNum == lastIndexedBlockNum {
        //所有块索引的提交
        return nil
    }

    for ; lastIndexedBlockNum < lastCommittedBlockNum; lastIndexedBlockNum++ {
        blockNumToIndex := lastIndexedBlockNum + 1
        blockToIndex, errBlockFetch := blockchain.getBlock(blockNumToIndex)
        if errBlockFetch != nil {
            return errBlockFetch
        }

        blockHash, errBlockHash := blockToIndex.GetHash()
        if errBlockHash != nil {
            return errBlockHash
        }
        indexer.createIndexesInternal(blockToIndex, blockNumToIndex, blockHash)
    }
    return nil
}

blockIndexed


块索引
func (indexerState *blockchainIndexerState) blockIndexed(blockNumber uint64) {
    indexerState.newBlockIndexed.L.Lock()
    defer indexerState.newBlockIndexed.L.Unlock()
    indexerState.lastBlockIndexed = blockNumber
    indexerState.zerothBlockIndexed = true
    indexerState.newBlockIndexed.Broadcast()
}

waitForLastCommittedBlock


等待最后一个块的创建
func (indexerState *blockchainIndexerState) waitForLastCommittedBlock() (err error) {
    chain := indexerState.indexer.blockchain
    if err != nil || chain.getSize() == 0 {
        return
    }

    lastBlockCommitted := chain.getSize() - 1

    indexerState.newBlockIndexed.L.Lock()
    defer indexerState.newBlockIndexed.L.Unlock()

    if !indexerState.zerothBlockIndexed {
        indexLogger.Debug(
            "Waiting for zeroth block to be indexed. lastBlockCommitted=[%d] and lastBlockIndexed=[%d]",
            lastBlockCommitted, indexerState.lastBlockIndexed)
        indexerState.newBlockIndexed.Wait()
    }

    for indexerState.lastBlockIndexed < lastBlockCommitted {
        indexLogger.Debug(
            "Waiting for index to catch up with block chain. lastBlockCommitted=[%d] and lastBlockIndexed=[%d]",
            lastBlockCommitted, indexerState.lastBlockIndexed)
        indexerState.newBlockIndexed.Wait()
    }
    return
}

fetchLastIndexedBlockNumFromDB


获取从数据库中得到上一个块号的块索引
func fetchLastIndexedBlockNumFromDB() (zerothBlockIndexed bool, lastIndexedBlockNum uint64, err error) {
    lastIndexedBlockNumberBytes, err := db.GetDBHandle().GetFromIndexesCF(lastIndexedBlockKey)
    if err != nil {
        return
    }
    if lastIndexedBlockNumberBytes == nil {
        return
    }
    lastIndexedBlockNum = decodeBlockNumber(lastIndexedBlockNumberBytes)
    zerothBlockIndexed = true
    return
}

ledger


先看下ledger的结构
type Ledger struct {
    blockchain *blockchain //区块链
    state      *state.State //状态
    currentID  interface{} //当前ID
}

GetLedger


给出”单个“ledger的引用
func GetLedger() (*Ledger, error) {
    once.Do(func() {
        ledger, ledgerError = newLedger()
    })
    return ledger, ledgerError
}

BeginTxBatch


开始批量发出
func (ledger *Ledger) BeginTxBatch(id interface{}) error {
    err := ledger.checkValidIDBegin()
    if err != nil {
        return err
    }
    ledger.currentID = id
    return nil
}

GetTXBatchPreviewBlock


返回将具有相同块的哈希,如果ledger.CommitTxBatch使用相同的参数则提交到数据库。如果该状态是由一个事务这两个调用之间修改,散列将不同。该块预览不包括非散列数据,如本地时间戳。
func (ledger *Ledger) GetTXBatchPreviewBlock(id interface{},
    transactions []*protos.Transaction, metadata []byte) (*protos.Block, error) {
    err := ledger.checkValidIDCommitORRollback(id)
    if err != nil {
        return nil, err
    }
    stateHash, err := ledger.state.GetHash()
    if err != nil {
        return nil, err
    }
    return ledger.blockchain.buildBlock(protos.NewBlock(transactions, metadata), stateHash), nil
}

CommitTxBatch


CommitTxBatch被调用时,当前事务需要分批次提交,该函数成功返回了交易的细节和状态变化(可能在这个交易批量的执行过程中发生)一直致力于持久化存储
func (ledger *Ledger) CommitTxBatch(id interface{}, transactions []*protos.Transaction, transactionResults []*protos.TransactionResult, metadata []byte) error {
    err := ledger.checkValidIDCommitORRollback(id)
    if err != nil {
        return err
    }

    stateHash, err := ledger.state.GetHash()
    if err != nil {
        ledger.resetForNextTxGroup(false)
        ledger.blockchain.blockPersistenceStatus(false)
        return err
    }

    writeBatch := gorocksdb.NewWriteBatch()
    defer writeBatch.Destroy()
    block := protos.NewBlock(transactions, metadata)
    block.NonHashData = &protos.NonHashData{TransactionResults: transactionResults}
    newBlockNumber, err := ledger.blockchain.addPersistenceChangesForNewBlock(context.TODO(), block, stateHash, writeBatch)
    if err != nil {
        ledger.resetForNextTxGroup(false)
        ledger.blockchain.blockPersistenceStatus(false)
        return err
    }
    ledger.state.AddChangesForPersistence(newBlockNumber, writeBatch)
    opt := gorocksdb.NewDefaultWriteOptions()
    defer opt.Destroy()
    dbErr := db.GetDBHandle().DB.Write(opt, writeBatch)
    if dbErr != nil {
        ledger.resetForNextTxGroup(false)
        ledger.blockchain.blockPersistenceStatus(false)
        return dbErr
    }

    ledger.resetForNextTxGroup(true)
    ledger.blockchain.blockPersistenceStatus(true)

    sendProducerBlockEvent(block)
    return nil
}

RollbackTxBatch


批处理回滚时放弃当前事务批次执行过程中可能发生的所有状态变化
func (ledger *Ledger) RollbackTxBatch(id interface{}) error {
    ledgerLogger.Debug("RollbackTxBatch for id = [%s]", id)
    err := ledger.checkValidIDCommitORRollback(id)
    if err != nil {
        return err
    }
    ledger.resetForNextTxGroup(false)
    return nil
}

TxBegin


标志着在持续一批新的交易开始
func (ledger *Ledger) TxBegin(txUUID string) {
    ledger.state.TxBegin(txUUID)
}

TxFinished


标志着正在进行交易的完成。如果成功话设置为false,丢弃事务的状态变化
func (ledger *Ledger) TxFinished(txUUID string, txSuccessful bool) {
    ledger.state.TxFinish(txUUID, txSuccessful)
}

GetTempStateHash


计算哈希状态并考虑到当前事务批次执行过程中可能发生的状态变化
func (ledger *Ledger) GetTempStateHash() ([]byte, error) {
    return ledger.state.GetHash()
}

GetTempStateHashWithTxDeltaStateHashes


除状态散列(如在方法GetTempStateHash定义),
此方法返回一个映射[TX的txUuid - > cryptoHash(stateChange MadeBySIx),只有TX成功,才会出现在该映射中
func (ledger *Ledger) GetTempStateHashWithTxDeltaStateHashes() ([]byte, map[string][]byte, error) {
    stateHash, err := ledger.state.GetHash()
    return stateHash, ledger.state.GetTxStateDeltaHash(), err
}

GetState


获取chaincode的id和键值。如果提交为false,它首先会在内存中查看。如果丢失的话,将从数据库中获取。如果提交为true,则仅仅只能在数据库中获取。
func (ledger *Ledger) GetState(chaincodeID string, key string, committed bool) ([]byte, error) {
    return ledger.state.Get(chaincodeID, key, committed)
}

GetStateRangeScanIterator


返回一个迭代器来获取所有startKey和endKey之间的键(和值)(假设键的词汇顺序)为chaincodeID。如果提交为true,则从数据库检索的键值是唯一。如果提交为false,从数据库被mergerd后的结果与在存储器中的结果(优先考虑在内存中的数据)在返回的迭代的键值是不同的
 guaranteed to be in any specific order
func (ledger *Ledger) GetStateRangeScanIterator(chaincodeID string, startKey string, endKey string, committed bool) (statemgmt.RangeScanIterator, error) {
    return ledger.state.GetRangeScanIterator(chaincodeID, startKey, endKey, committed)
}

GetStateSnapshot


返回当前块点对点全局状态。 这个是在从一个端到另一个端转化中的状态时使用。必须调用状态Snapshot.Release()方法一旦你与快照是以释放资源完成的。
func (ledger *Ledger) GetStateSnapshot() (*state.StateSnapshot, error) {
    dbSnapshot := db.GetDBHandle().GetSnapshot()
    blockHeight, err := fetchBlockchainSizeFromSnapshot(dbSnapshot)
    if err != nil {
        dbSnapshot.Release()
        return nil, err
    }
    if 0 == blockHeight {
        dbSnapshot.Release()
        return nil, fmt.Errorf("Blockchain has no blocks, cannot determine block number")
    }
    return ledger.state.GetSnapshot(blockHeight-1, dbSnapshot)
}

GetStateDelta


如果可用,则返回指定块的状态增量。
func (ledger *Ledger) GetStateDelta(blockNumber uint64) (*statemgmt.StateDelta, error) {
    if blockNumber >= ledger.GetBlockchainSize() {
        return nil, ErrOutOfBounds
    }
    return ledger.state.FetchStateDeltaFromDB(blockNumber)
}

ApplyStateDelta


即适用于一个当前的状态状态增量。它只在内存改变。必须调用ledger.CommitStateDelta持久化到数据库。这应该只被用来作为状态同步的一部分。状态增量可以从另一对等虽然Ledger.GetStateDelta函数检索或者与来自Ledger.GetStateshot()获取密钥创??建的状态增量。举一个例子,在ledger_test.go定义的TestSetRawState。请注意,没有在此功能上检查它是否被调用,以确保增量是在正确的顺序中使用。例如,如果你目前正处于块8,并调用Ledger.GetStateDelta(10)的功能检索增量,您现在会是在一个糟糕的状态,因为你没有块9.申请增量这是可能的回滚状态向前或向后使用stateDelta.RollBackwards。默认情况下,块3检索的增量可以被用来从状态向前回滚在块2到状态在块3.如果
stateDelta.RollBackwards =false,增量检索块3可用于向后滚动块3状态和块2的状态。
func (ledger *Ledger) ApplyStateDelta(id interface{}, delta *statemgmt.StateDelta) error {
    err := ledger.checkValidIDBegin()
    if err != nil {
        return err
    }
    ledger.currentID = id
    ledger.state.ApplyStateDelta(delta)
    return nil
}

CommitStateDelta


将提交ledger.ApplyState状态增量并传递到到数据库
func (ledger *Ledger) CommitStateDelta(id interface{}) error {
    err := ledger.checkValidIDCommitORRollback(id)
    if err != nil {
        return err
    }
    defer ledger.resetForNextTxGroup(true)
    return ledger.state.CommitStateDelta()
}

RollbackStateDelta


放弃到ledger.ApplyStateDelta状态增量
func (ledger *Ledger) RollbackStateDelta(id interface{}) error {
    err := ledger.checkValidIDCommitORRollback(id)
    if err != nil {
        return err
    }
    ledger.resetForNextTxGroup(false)
    return nil
}

VerifyChain


将验证blockchain的integrety。完成这一步
通过确保存储在每个块中的前一个块的哈希链中的前块的实际散列相匹配。返回值是包含不匹配的前一个块的散列块的块号。例如,如果验证链(0,99)称为与prevous哈希值存储在块8中,32和42不相匹配各自前块42的实际的哈希值将是从该函数的返回值。 highBlock在链中高级验证。 如果你要验证的整个链条中,使用ledger.GetBlockchainsize() - 1。低块是在链中被低级验证。如果您想验证整个链条,为创世区块使用0。
func (ledger *Ledger) VerifyChain(highBlock, lowBlock uint64) (uint64, error) {
    if highBlock >= ledger.GetBlockchainSize() {
        return highBlock, ErrOutOfBounds
    }
    if highBlock <= lowBlock {
        return lowBlock, ErrOutOfBounds
    }

    for i := highBlock; i > lowBlock; i-- {
        currentBlock, err := ledger.GetBlockByNumber(i)
        if err != nil {
            return i, fmt.Errorf("Error fetching block %d.", i)
        }
        if currentBlock == nil {
            return i, fmt.Errorf("Block %d is nil.", i)
        }
        previousBlock, err := ledger.GetBlockByNumber(i - 1)
        if err != nil {
            return i - 1, fmt.Errorf("Error fetching block %d.", i)
        }
        if previousBlock == nil {
            return i - 1, fmt.Errorf("Block %d is nil.", i-1)
        }

        previousBlockHash, err := previousBlock.GetHash()
        if err != nil {
            return i - 1, fmt.Errorf("Error calculating block hash for block %d.", i-1)
        }
        if bytes.Compare(previousBlockHash, currentBlock.PreviousBlockHash) != 0 {
            return i, nil
        }
    }

    return 0, nil
}

sendProducerBlockEvent

func sendProducerBlockEvent(block *protos.Block) {

    // 从部署删除交易的有效载荷。这样做是为了创建块
    //这些类型的交易使事件更轻巧,有效载荷有可能非常大
    blockTransactions := block.GetTransactions()
    for _, transaction := range blockTransactions {
        if transaction.Type == protos.Transaction_CHAINCODE_NEW {
            deploymentSpec := &protos.ChaincodeDeploymentSpec{}
            err := proto.Unmarshal(transaction.Payload, deploymentSpec)
            if err != nil {
                ledgerLogger.Error(fmt.Sprintf("Error unmarshalling deployment transaction for block event: %s", err))
                continue
            }
            deploymentSpec.CodePackage = nil
            deploymentSpecBytes, err := proto.Marshal(deploymentSpec)
            if err != nil {
                ledgerLogger.Error(fmt.Sprintf("Error marshalling deployment transaction for block event: %s", err))
                continue
            }
            transaction.Payload = deploymentSpecBytes
        }
    }

    producer.Send(producer.CreateBlockEvent(block))
}

genesis

genesis


类似于chaincode,调用go-logging中logging库的MustGetLogger函数对genesis package进行记录
var genesisLogger = logging.MustGetLogger("genesis")

MakeGenesis


MakeGenesis基于在openchain.yaml中配置创建创世区块,并把它添加到blockchain。
func MakeGenesis() error {
    once.Do(func() {
        ledger, err := ledger.GetLedger()
        if err != nil {
            makeGenesisError = err
            return
        }

        if ledger.GetBlockchainSize() > 0 {
            // 获取blockchain的大小,如果大于0代表创世区块已经存在
            return
        }

        genesisLogger.Info("Creating genesis block.")

        ledger.BeginTxBatch(0)
        var genesisTransactions []*protos.Transaction
        //我们现在禁用在有效期内部署,甚至不应该允许它在配置中启用,将其设置为false
        allowDeployValidityPeriod := false

        if(deploySystemChaincodeEnabled() && allowDeployValidityPeriod){
            vpTransaction, deployErr :=  deployUpdateValidityPeriodChaincode()

            if deployErr != nil {
                genesisLogger.Error("Error deploying validity period system chaincode for genesis block.", deployErr)
                makeGenesisError = deployErr
                return
            }

            genesisTransactions = append(genesisTransactions, vpTransaction)
        }

        genesis := viper.GetStringMap("ledger.blockchain.genesisBlock")

        if genesis == nil {
            genesisLogger.Info("No genesis block chaincodes defined.")
        } else {

            chaincodes, chaincodesOK := genesis["chaincode"].([]interface{})
            if !chaincodesOK {
                genesisLogger.Info("No genesis block chaincodes defined.")
                ledger.CommitTxBatch(0, genesisTransactions, nil, nil)
                return
            }

            genesisLogger.Debug("Genesis chaincodes are %s", chaincodes)


            for i := 0; i < len(chaincodes); i++ {
                genesisLogger.Debug("Chaincode %d is %s", i, chaincodes[i])

                chaincodeMap, chaincodeMapOK := chaincodes[i].(map[interface{}]interface{})
                if !chaincodeMapOK {
                    genesisLogger.Error("Invalid chaincode defined in genesis configuration:", chaincodes[i])
                    makeGenesisError = fmt.Errorf("Invalid chaincode defined in genesis configuration: %s", chaincodes[i])
                    return
                }

                path, pathOK := chaincodeMap["path"].(string)
                if !pathOK {
                    genesisLogger.Error("Invalid chaincode URL defined in genesis configuration:", chaincodeMap["path"])
                    makeGenesisError = fmt.Errorf("Invalid chaincode URL defined in genesis configuration: %s", chaincodeMap["path"])
                    return
                }

                chaincodeType, chaincodeTypeOK := chaincodeMap["type"].(string)
                if !chaincodeTypeOK {
                    genesisLogger.Error("Invalid chaincode type defined in genesis configuration:", chaincodeMap["type"])
                    makeGenesisError = fmt.Errorf("Invalid chaincode type defined in genesis configuration: %s", chaincodeMap["type"])
                    return
                }

                chaincodeID := &protos.ChaincodeID{Path: path, Name: ""}

                genesisLogger.Debug("Genesis chaincodeID %s", chaincodeID)
                genesisLogger.Debug("Genesis chaincode type %s", chaincodeType)

                constructorMap, constructorMapOK := chaincodeMap["constructor"].(map[interface{}]interface{})
                if !constructorMapOK {
                    genesisLogger.Error("Invalid chaincode constructor defined in genesis configuration:", chaincodeMap["constructor"])
                    makeGenesisError = fmt.Errorf("Invalid chaincode constructor defined in genesis configuration: %s", chaincodeMap["constructor"])
                    return
                }

                var spec protos.ChaincodeSpec
                if constructorMap == nil {
                    genesisLogger.Debug("Genesis chaincode has no constructor.")
                    spec = protos.ChaincodeSpec{Type: protos.ChaincodeSpec_Type(protos.ChaincodeSpec_Type_value[chaincodeType]), ChaincodeID: chaincodeID}
                } else {

                    ctorFunc, ctorFuncOK := constructorMap["func"].(string)
                    if !ctorFuncOK {
                        genesisLogger.Error("Invalid chaincode constructor function defined in genesis configuration:", constructorMap["func"])
                        makeGenesisError = fmt.Errorf("Invalid chaincode constructor function args defined in genesis configuration: %s", constructorMap["func"])
                        return
                    }

                    ctorArgs, ctorArgsOK := constructorMap["args"].([]interface{})
                    if !ctorArgsOK {
                        genesisLogger.Error("Invalid chaincode constructor args defined in genesis configuration:", constructorMap["args"])
                        makeGenesisError = fmt.Errorf("Invalid chaincode constructor args defined in genesis configuration: %s", constructorMap["args"])
                        return
                    }

                    genesisLogger.Debug("Genesis chaincode constructor func %s", ctorFunc)
                    genesisLogger.Debug("Genesis chaincode constructor args %s", ctorArgs)
                    var ctorArgsStringArray []string
                    for j := 0; j < len(ctorArgs); j++ {
                        ctorArgsStringArray = append(ctorArgsStringArray, ctorArgs[j].(string))
                    }

                    spec = protos.ChaincodeSpec{Type: protos.ChaincodeSpec_Type(protos.ChaincodeSpec_Type_value[chaincodeType]), ChaincodeID: chaincodeID, CtorMsg: &protos.ChaincodeInput{Function: ctorFunc, Args: ctorArgsStringArray}}
                }

                transaction, _, deployErr := DeployLocal(context.Background(), &spec)
                if deployErr != nil {
                    genesisLogger.Error("Error deploying chaincode for genesis block.", deployErr)
                    makeGenesisError = deployErr
                    return
                }

                genesisTransactions = append(genesisTransactions, transaction)

            }//for

        }//else

        genesisLogger.Info("Adding %d system chaincodes to the genesis block.", len(genesisTransactions))
        ledger.CommitTxBatch(0, genesisTransactions, nil, nil)

    })
    return makeGenesisError
}

statemgmt

util

EncodeOrderPreservingVarUint64


返回一个字节表示要的int64数使得起始字节全零比特,以减少阵列的长度被修整,用于保存在一个缺省字节对比的顺序,第一个字节包含剩余的第一字节的bytes。存在的数量也允许使用返回的字节作为其它较大字节阵列的一部分,如以数据库复合键表示
func EncodeOrderPreservingVarUint64(number uint64) []byte {
    bytes := make([]byte, 8)
    binary.BigEndian.PutUint64(bytes, number)
    startingIndex := 0
    size := 0
    for i, b := range bytes {
        if b != 0x00 {
            startingIndex = i
            size = 8 - i
            break
        }
    }
    sizeBytes := proto.EncodeVarint(uint64(size))
    if len(sizeBytes) > 1 {
        panic(fmt.Errorf("[]sizeBytes should not be more than one byte because the max number it needs to hold is 8. size=%d", size))
    }
    encodedBytes := make([]byte, size+1)
    encodedBytes[0] = sizeBytes[0]
    copy(encodedBytes[1:], bytes[startingIndex:])
    return encodedBytes
}

DecodeOrderPreservingVarUint64


解码从由方法“EncodeOrderPreservingVarUint64’得到的字节数。
此外,返回在该过程中所消耗的字节数
func DecodeOrderPreservingVarUint64(bytes []byte) (uint64, int) {
    s, _ := proto.DecodeVarint(bytes)
    size := int(s)
    decodedBytes := make([]byte, 8)
    copy(decodedBytes[8-size:], bytes[1:size+1])
    numBytesConsumed := size + 1
    return binary.BigEndian.Uint64(decodedBytes), numBytesConsumed
}

上一篇文章      下一篇文章      查看所有文章
2016-04-02 21:01:17  
360图书馆 论文大全 母婴/育儿 软件开发资料 网页快照 文字转语音 购物精选 软件 美食菜谱 新闻中心 电影下载 小游戏 Chinese Culture
生肖星座解梦 人民的名义 人民的名义在线看 三沣玩客 拍拍 视频 开发 Android开发 站长 古典小说 网文精选 搜图网 天下美图
中国文化英文 多播视频 装修知识库
2017-4-23 22:01:28
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  软件世界网 --